Bug 4651: Implement handling of ClusteredDOMDataTreeChangeListener in CDS 43/29943/2
authorTom Pantelis <tpanteli@brocade.com>
Thu, 19 Nov 2015 11:26:10 +0000 (06:26 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 20 Nov 2015 12:17:53 +0000 (12:17 +0000)
Implemented handling of ClusteredDOMDataTreeChangeListener similar as to
what was done previously for ClusteredDOMDataChangeListener.

I also refactored the listener support classes used by Shard and
extracted generic base classes for the common functionality.

Change-Id: I694a6a4ce41284f7ecd3bf73bc6201e9d5555998
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
15 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java
new file mode 100644 (file)
index 0000000..a253b79
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EventListener;
+import java.util.Map.Entry;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractDataListenerSupport<L extends EventListener, R extends ListenerRegistrationMessage,
+        D extends DelayedListenerRegistration<L, R>, LR extends ListenerRegistration<L>>
+                extends LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
+    private final ArrayList<D> delayedListenerOnAllRegistrations = new ArrayList<>();
+    private final Collection<ActorSelection> actors = new ArrayList<>();
+
+    protected AbstractDataListenerSupport(Shard shard) {
+        super(shard);
+    }
+
+    @Override
+    void onLeadershipChange(boolean isLeader, boolean hasLeader) {
+        log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
+
+        final EnableNotification msg = new EnableNotification(isLeader);
+        for(ActorSelection dataChangeListener : actors) {
+            dataChangeListener.tell(msg, getSelf());
+        }
+
+        if(hasLeader) {
+            for(D reg : delayedListenerOnAllRegistrations) {
+                reg.createDelegate(this);
+            }
+
+            delayedListenerOnAllRegistrations.clear();
+            delayedListenerOnAllRegistrations.trimToSize();
+        }
+
+        if(isLeader) {
+            for(D reg : delayedListenerRegistrations) {
+                reg.createDelegate(this);
+            }
+
+            delayedListenerRegistrations.clear();
+            delayedListenerRegistrations.trimToSize();
+        }
+    }
+
+    @Override
+    void onMessage(R message, boolean isLeader, boolean hasLeader) {
+        log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
+
+        final ListenerRegistration<L> registration;
+        if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
+            final Entry<LR, Optional<DataTreeCandidate>> res = createDelegate(message);
+            registration = res.getKey();
+        } else {
+            log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
+
+            D delayedReg = newDelayedListenerRegistration(message);
+            if(message.isRegisterOnAllInstances()) {
+                delayedListenerOnAllRegistrations.add(delayedReg);
+            } else {
+                delayedListenerRegistrations.add(delayedReg);
+            }
+
+            registration = delayedReg;
+        }
+
+        ActorRef registrationActor = newRegistrationActor(registration);
+
+        log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
+                registrationActor.path());
+
+        tellSender(newRegistrationReplyMessage(registrationActor));
+    }
+
+    protected Logger log() {
+        return log;
+    }
+
+    protected void addListenerActor(ActorSelection actor) {
+        actors.add(actor);
+    }
+
+    protected abstract D newDelayedListenerRegistration(R message);
+
+    protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
+
+    protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
+
+    protected abstract String logName();
+}
index 05accdd..f4b6bcc 100644 (file)
@@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
@@ -22,88 +20,16 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener,
-        DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-        Optional<DataTreeCandidate>> {
-    private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
-    private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
-    private final List<ActorSelection> dataChangeListeners =  new ArrayList<>();
-    private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
+final class DataChangeListenerSupport extends AbstractDataListenerSupport<
+        AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
+            DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
+                    AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
 
     DataChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
-    @Override
-    void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
-        LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
-
-        for (ActorSelection dataChangeListener : dataChangeListeners) {
-            dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
-        }
-
-        if(hasLeader) {
-            for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
-                registerDelayedListeners(reg);
-            }
-            delayedRegisterOnAllListeners.clear();
-        }
-
-        if (isLeader) {
-            for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
-                registerDelayedListeners(reg);
-            }
-
-            delayedListenerRegistrations.clear();
-        }
-    }
-
-    private void registerDelayedListeners(DelayedListenerRegistration reg) {
-        if(!reg.isClosed()) {
-            final Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-                            Optional<DataTreeCandidate>> res = createDelegate(reg.getRegisterChangeListener());
-            reg.setDelegate(res.getKey());
-            getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue());
-        }
-    }
-
-    @Override
-    void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
-
-        LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
-            persistenceId(), message.getPath(), isLeader, hasLeader);
-
-        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                                     NormalizedNode<?, ?>>> registration;
-        if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
-            final Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
-                    Optional<DataTreeCandidate>> res = createDelegate(message);
-            registration = res.getKey();
-
-            getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue());
-        } else {
-            LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
-            DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
-            if(message.isRegisterOnAllInstances()) {
-                delayedRegisterOnAllListeners.add(delayedReg);
-            } else {
-                delayedListenerRegistrations.add(delayedReg);
-            }
-            registration = delayedReg;
-        }
-
-        ActorRef listenerRegistration = createActor(DataChangeListenerRegistrationActor.props(registration));
-
-        LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
-                persistenceId(), listenerRegistration.path());
-
-        tellSender(new RegisterChangeListenerReply(listenerRegistration));
-    }
-
     @Override
     Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
             Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
@@ -117,15 +43,41 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
         if(!message.isRegisterOnAllInstances()) {
-            dataChangeListeners.add(dataChangeListenerPath);
+            addListenerActor(dataChangeListenerPath);
         }
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
                 new DataChangeListenerProxy(dataChangeListenerPath);
 
-        LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
+        log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
 
-        return getShard().getDataStore().registerChangeListener(message.getPath(), listener,
-                message.getScope());
+        Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+                Optional<DataTreeCandidate>> regEntry = getShard().getDataStore().registerChangeListener(
+                        message.getPath(), listener, message.getScope());
+
+        getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
+
+        return regEntry;
+    }
+
+    @Override
+    protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) {
+        return new DelayedDataChangeListenerRegistration(message);
+    }
+
+    @Override
+    protected ActorRef newRegistrationActor(
+            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        return createActor(DataChangeListenerRegistrationActor.props(registration));
+    }
+
+    @Override
+    protected Object newRegistrationReplyMessage(ActorRef registrationActor) {
+        return new RegisterChangeListenerReply(registrationActor);
+    }
+
+    @Override
+    protected String logName() {
+        return "registerDataChangeListener";
     }
 }
index 1a27f2e..a45ae52 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChang
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -95,7 +96,8 @@ final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> ext
     private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
 
         Future<Object> future = actorContext.executeOperationAsync(shard,
-                new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
+                new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+                        getInstance() instanceof ClusteredDOMDataTreeChangeListener),
                 actorContext.getDatastoreContext().getShardInitializationTimeout());
 
         future.onComplete(new OnComplete<Object>(){
index 76458fd..fa55523 100644 (file)
@@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
@@ -19,65 +17,16 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeCh
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener,
-        ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> {
-    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
-    private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
-    private final Collection<ActorSelection> actors = new ArrayList<>();
 
+final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
+        RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration, ListenerRegistration<DOMDataTreeChangeListener>> {
     DataTreeChangeListenerSupport(final Shard shard) {
         super(shard);
     }
 
     @Override
-    void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
-        final EnableNotification msg = new EnableNotification(isLeader);
-        for (ActorSelection dataChangeListener : actors) {
-            dataChangeListener.tell(msg, getSelf());
-        }
-
-        if (isLeader) {
-            for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
-                reg.createDelegate(this);
-            }
-            delayedRegistrations.clear();
-            delayedRegistrations.trimToSize();
-        }
-    }
-
-    @Override
-    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) {
-        LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
-
-        final ListenerRegistration<DOMDataTreeChangeListener> registration;
-        if (!isLeader) {
-            LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
-            DelayedDataTreeListenerRegistration delayedReg =
-                    new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
-            delayedRegistrations.add(delayedReg);
-            registration = delayedReg;
-        } else {
-            final Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> res =
-                    createDelegate(registerTreeChangeListener);
-            registration = res.getKey();
-            getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(),
-                    registration.getInstance(), res.getValue());
-        }
-
-        ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
-
-        LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
-            persistenceId(), listenerRegistration.path());
-
-        tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
-    }
-
-    @Override
-    Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(final RegisterDataTreeChangeListener message) {
+    Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+            final RegisterDataTreeChangeListener message) {
         ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
@@ -87,12 +36,40 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
 
         // Now store a reference to the data change listener so it can be notified
         // at a later point if notifications should be enabled or disabled
-        actors.add(dataChangeListenerPath);
+        if(!message.isRegisterOnAllInstances()) {
+            addListenerActor(dataChangeListenerPath);
+        }
 
         DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
 
-        LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
+        log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
+
+        Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> regEntry =
+                getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
+
+        getShard().getDataStore().notifyOfInitialData(message.getPath(),
+                regEntry.getKey().getInstance(), regEntry.getValue());
+
+        return regEntry;
+    }
+
+    @Override
+    protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(RegisterDataTreeChangeListener message) {
+        return new DelayedDataTreeListenerRegistration(message);
+    }
 
-        return getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
+    @Override
+    protected ActorRef newRegistrationActor(ListenerRegistration<DOMDataTreeChangeListener> registration) {
+        return createActor(DataTreeChangeListenerRegistrationActor.props(registration));
+    }
+
+    @Override
+    protected Object newRegistrationReplyMessage(ActorRef registrationActor) {
+        return new RegisterDataTreeChangeListenerReply(registrationActor);
+    }
+
+    @Override
+    protected String logName() {
+        return "registerTreeChangeListener";
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..c49c3de
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration<
+           AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
+
+    DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) {
+        super(registerChangeListener);
+    }
+}
\ No newline at end of file
index 958ccc4..35f7308 100644 (file)
@@ -7,54 +7,19 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.Map.Entry;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Intermediate proxy registration returned to the user when we cannot
  * instantiate the registration immediately. It provides a bridge to
  * a real registration which may materialize at some point in the future.
  */
-final class DelayedDataTreeListenerRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
-    private final RegisterDataTreeChangeListener registerTreeChangeListener;
-    private volatile ListenerRegistration<DOMDataTreeChangeListener> delegate;
-    @GuardedBy("this")
-    private boolean closed;
+final class DelayedDataTreeListenerRegistration
+        extends DelayedListenerRegistration<DOMDataTreeChangeListener, RegisterDataTreeChangeListener> {
 
     DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
-        this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
-    }
-
-    synchronized void createDelegate(final LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> factory) {
-        if (!closed) {
-            final Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> res =
-                    factory.createDelegate(registerTreeChangeListener);
-            this.delegate = res.getKey();
-            factory.getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(),
-                    this.delegate.getInstance(), res.getValue());
-        }
-    }
-
-    @Override
-    public DOMDataTreeChangeListener getInstance() {
-        final ListenerRegistration<DOMDataTreeChangeListener> d = delegate;
-        return d == null ? null : d.getInstance();
-    }
-
-    @Override
-    public synchronized void close() {
-        if (!closed) {
-            closed = true;
-            if (delegate != null) {
-                delegate.close();
-            }
-        }
+        super(registerTreeChangeListener);
     }
 }
 
index 8eb595d..8f18cb7 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,49 +7,53 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import com.google.common.base.Optional;
+import java.util.EventListener;
+import java.util.Map.Entry;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
-final class DelayedListenerRegistration implements
-    ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+abstract class DelayedListenerRegistration<L extends EventListener, R> implements ListenerRegistration<L> {
+    private final R registrationMessage;
+    private volatile ListenerRegistration<L> delegate;
 
-    private volatile boolean closed;
+    @GuardedBy("this")
+    private boolean closed;
 
-    private final RegisterChangeListener registerChangeListener;
-
-    private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                                         NormalizedNode<?, ?>>> delegate;
-
-    DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
-        this.registerChangeListener = registerChangeListener;
+    protected DelayedListenerRegistration(R registrationMessage) {
+        this.registrationMessage = registrationMessage;
     }
 
-    void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
-                                        NormalizedNode<?, ?>>> registration) {
-        this.delegate = registration;
+    R getRegistrationMessage() {
+        return registrationMessage;
     }
 
-    boolean isClosed() {
-        return closed;
+    ListenerRegistration<L> getDelegate() {
+        return delegate;
     }
 
-    RegisterChangeListener getRegisterChangeListener() {
-        return registerChangeListener;
+    synchronized <LR extends ListenerRegistration<L>> void createDelegate(
+            final LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> factory) {
+        if (!closed) {
+            final Entry<LR, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
+            this.delegate = res.getKey();
+        }
     }
 
     @Override
-    public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
-        return delegate != null ? delegate.getInstance() : null;
+    public L getInstance() {
+        final ListenerRegistration<L> d = delegate;
+        return d == null ? null : (L)d.getInstance();
     }
 
     @Override
-    public void close() {
-        closed = true;
-        if(delegate != null) {
-            delegate.close();
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+            if (delegate != null) {
+                delegate.close();
+            }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java
new file mode 100644 (file)
index 0000000..5cea06b
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public interface ListenerRegistrationMessage {
+    YangInstanceIdentifier getPath();
+
+    boolean isRegisterOnAllInstances();
+}
index f7a51a9..a3b7e12 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class RegisterChangeListener implements SerializableMessage {
+public class RegisterChangeListener implements SerializableMessage, ListenerRegistrationMessage {
     public static final Class<ListenerRegistrationMessages.RegisterChangeListener> SERIALIZABLE_CLASS =
             ListenerRegistrationMessages.RegisterChangeListener.class;
 
@@ -36,6 +36,7 @@ public class RegisterChangeListener implements SerializableMessage {
         this.registerOnAllInstances = registerOnAllInstances;
     }
 
+    @Override
     public YangInstanceIdentifier getPath() {
         return path;
     }
@@ -49,6 +50,7 @@ public class RegisterChangeListener implements SerializableMessage {
         return dataChangeListener.path();
     }
 
+    @Override
     public boolean isRegisterOnAllInstances() {
         return registerOnAllInstances;
     }
index 941336e..f488112 100644 (file)
@@ -20,16 +20,20 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
  * leader.
  */
-public final class RegisterDataTreeChangeListener implements Externalizable {
+public final class RegisterDataTreeChangeListener implements Externalizable, ListenerRegistrationMessage {
     private static final long serialVersionUID = 1L;
     private ActorRef dataTreeChangeListenerPath;
     private YangInstanceIdentifier path;
+    private boolean registerOnAllInstances;
 
-    public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) {
+    public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath,
+            final boolean registerOnAllInstances) {
         this.path = Preconditions.checkNotNull(path);
         this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath);
+        this.registerOnAllInstances = registerOnAllInstances;
     }
 
+    @Override
     public YangInstanceIdentifier getPath() {
         return path;
     }
@@ -38,15 +42,22 @@ public final class RegisterDataTreeChangeListener implements Externalizable {
         return dataTreeChangeListenerPath;
     }
 
+    @Override
+    public boolean isRegisterOnAllInstances() {
+        return registerOnAllInstances;
+    }
+
     @Override
     public void writeExternal(final ObjectOutput out) throws IOException {
         out.writeObject(dataTreeChangeListenerPath);
         SerializationUtils.serializePath(path, out);
+        out.writeBoolean(registerOnAllInstances);
     }
 
     @Override
     public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         dataTreeChangeListenerPath = (ActorRef) in.readObject();
         path = SerializationUtils.deserializePath(in);
+        registerOnAllInstances = in.readBoolean();
     }
 }
index 0db1cdf..36aa9a2 100644 (file)
@@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -69,6 +70,8 @@ public abstract class AbstractShardTest extends AbstractActorTest{
             shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
             shardHeartbeatIntervalInMillis(100);
 
+    protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
     @Before
     public void setUp() {
         InMemorySnapshotStore.clear();
@@ -79,6 +82,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     public void tearDown() {
         InMemorySnapshotStore.clear();
         InMemoryJournal.clear();
+        actorFactory.close();
     }
 
     protected DatastoreContext newDatastoreContext() {
index 924f650..fd1ab2b 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import scala.concurrent.ExecutionContextExecutor;
@@ -74,6 +75,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
 
             RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
             Assert.assertEquals("getPath", path, registerMsg.getPath());
+            Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
 
             reply(new RegisterDataTreeChangeListenerReply(getRef()));
 
@@ -101,6 +103,38 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest {
         }};
     }
 
+    @Test(timeout=10000)
+    public void testSuccessfulRegistrationForClusteredListener() {
+        new JavaTestKit(getSystem()) {{
+            ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+                    mock(ClusterWrapper.class), mock(Configuration.class));
+
+            ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(ClusteredDOMDataTreeChangeListener.class);
+
+            final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
+                    new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+
+            final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+            new Thread() {
+                @Override
+                public void run() {
+                    proxy.init("shard-1", path);
+                }
+
+            }.start();
+
+            FiniteDuration timeout = duration("5 seconds");
+            FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+            Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+            reply(new LocalShardFound(getRef()));
+
+            RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+            Assert.assertEquals("getPath", path, registerMsg.getPath());
+            Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+        }};
+    }
+
     @Test(timeout=10000)
     public void testLocalShardNotFound() {
         new JavaTestKit(getSystem()) {{
index c19f609..9baea72 100644 (file)
@@ -117,7 +117,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
             int expectedEvents, boolean isLeader) {
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
         ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
-        support.onMessage(new RegisterDataTreeChangeListener(path, dclActor), isLeader, true);
+        support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
         return listener;
     }
 
index f097c19..6c517a4 100644 (file)
@@ -279,7 +279,7 @@ public class ShardTest extends AbstractShardTest {
             final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
                     "testRegisterDataTreeChangeListener-DataTreeChangeListener");
 
-            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
 
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
                     RegisterDataTreeChangeListenerReply.class);
@@ -347,7 +347,7 @@ public class ShardTest extends AbstractShardTest {
             assertEquals("Got first ElectionTimeout", true,
                 onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
 
-            shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+            shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
             final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterDataTreeChangeListenerReply.class);
             assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
@@ -2517,146 +2517,150 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+    public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            dataStoreContextBuilder.persistent(false);
-            final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
-            final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
-            final Creator<Shard> creator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
-                boolean firstElectionTimeout = true;
-
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(newShardBuilder()) {
-                        @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
-                            if(message instanceof ElectionTimeout && firstElectionTimeout) {
-                                firstElectionTimeout = false;
-                                final ActorRef self = getSelf();
-                                new Thread() {
-                                    @Override
-                                    public void run() {
-                                        Uninterruptibles.awaitUninterruptibly(
-                                            onChangeListenerRegistered, 5, TimeUnit.SECONDS);
-                                        self.tell(message, self);
-                                    }
-                                }.start();
-
-                                onFirstElectionTimeout.countDown();
-                            } else {
-                                super.onReceiveCommand(message);
-                            }
-                        }
-                    };
-                }
-            };
+            String testName = "testClusteredDataChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
 
             final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
-                "testDataChangeListenerOnFollower-DataChangeListener");
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
 
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
-                    withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
 
-            assertEquals("Got first ElectionTimeout", true,
-                onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
-            shard.tell(new FindLeader(), getRef());
-            final FindLeaderReply findLeadeReply =
-                expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+            waitUntilNoLeader(shard);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
 
             shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
             final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
                 RegisterChangeListenerReply.class);
-            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-            onChangeListenerRegistered.countDown();
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
 
             listener.waitForChangeEvents();
-
-            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
         }};
     }
 
     @Test
-    public void testClusteredDataChangeListernerRegistration() throws Exception {
-        dataStoreContextBuilder.persistent(false).build();
+    public void testClusteredDataChangeListenerRegistration() throws Exception {
         new ShardTestKit(getSystem()) {{
-            final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
-                .shardName("inventory").type("config").build();
-
-            final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
-                .shardName("inventory").type("config").build();
-            final Creator<Shard> followerShardCreator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
+            String testName = "testClusteredDataChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
 
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
-                            peerAddresses(Collections.singletonMap(member2ShardID.toString(),
-                                    "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
-                        @Override
-                        public void onReceiveCommand(final Object message) throws Exception {
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataChangeListener listener = new MockDataChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+                    actorFactory.generateActorId(testName + "-DataChangeListener"));
 
-                            if(!(message instanceof ElectionTimeout)) {
-                                super.onReceiveCommand(message);
-                            }
-                        }
-                    };
-                }
-            };
+            followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                RegisterChangeListenerReply.class);
+            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
 
-            final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
-                private static final long serialVersionUID = 1L;
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-                @Override
-                public Shard create() throws Exception {
-                    return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
-                            peerAddresses(Collections.singletonMap(member1ShardID.toString(),
-                                    "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
-                }
-            };
+            listener.waitForChangeEvents();
+        }};
+    }
 
+    @Test
+    public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+            dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
 
-            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(followerShardCreator)),
-                member1ShardID.toString());
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
 
-            final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
-                Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
-                member2ShardID.toString());
-            // Sleep to let election happen
-            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+            final TestActorRef<Shard> shard = actorFactory.createTestActor(
+                    newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    actorFactory.generateActorId(testName + "-shard"));
 
-            shard.tell(new FindLeader(), getRef());
-            final FindLeaderReply findLeaderReply =
-                    expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+            waitUntilNoLeader(shard);
 
             final YangInstanceIdentifier path = TestModel.TEST_PATH;
-            final MockDataChangeListener listener = new MockDataChangeListener(1);
-            final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
-                "testDataChangeListenerOnFollower-DataChangeListener");
 
-            shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
-            final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
-                RegisterChangeListenerReply.class);
-            assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+            shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
 
             writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
+            shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
             listener.waitForChangeEvents();
+        }};
+    }
 
-            dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    @Test
+    public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            String testName = "testClusteredDataTreeChangeListenerRegistration";
+            final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+            final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+                    actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+            final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+                    Shard.builder().id(followerShardID).
+                        datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+                        peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+                            "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+            final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+                    Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+                        peerAddresses(Collections.singletonMap(followerShardID.toString(),
+                            "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+                    withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+            leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+            String leaderPath = waitUntilLeader(followerShard);
+            assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+            final YangInstanceIdentifier path = TestModel.TEST_PATH;
+            final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+            final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+                    actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+            followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+            final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+                    RegisterDataTreeChangeListenerReply.class);
+            assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+            writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            listener.waitForChangeEvents();
         }};
     }
 
index 281a190..ae607b9 100644 (file)
@@ -46,14 +46,14 @@ public class ShardTestKit extends JavaTestKit {
 
     }
 
-    public void waitUntilLeader(ActorRef shard) {
+    public String waitUntilLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
             try {
                 FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
                 if(resp.getLeaderActor() != null) {
-                    return;
+                    return resp.getLeaderActor();
                 }
             } catch(TimeoutException e) {
             } catch(Exception e) {
@@ -66,6 +66,7 @@ public class ShardTestKit extends JavaTestKit {
         }
 
         Assert.fail("Leader not found for shard " + shard.path());
+        return null;
     }
 
     public void waitUntilNoLeader(ActorRef shard) {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.