From 6050fd28f2def659abb5bc9d7127eb748b5fb32a Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 19 Nov 2015 06:26:10 -0500 Subject: [PATCH] Bug 4651: Implement handling of ClusteredDOMDataTreeChangeListener in CDS Implemented handling of ClusteredDOMDataTreeChangeListener similar as to what was done previously for ClusteredDOMDataChangeListener. I also refactored the listener support classes used by Shard and extracted generic base classes for the common functionality. Change-Id: I694a6a4ce41284f7ecd3bf73bc6201e9d5555998 Signed-off-by: Tom Pantelis --- .../AbstractDataListenerSupport.java | 109 +++++++++ .../datastore/DataChangeListenerSupport.java | 116 +++------- .../DataTreeChangeListenerProxy.java | 4 +- .../DataTreeChangeListenerSupport.java | 93 +++----- ...DelayedDataChangeListenerRegistration.java | 21 ++ .../DelayedDataTreeListenerRegistration.java | 41 +--- .../DelayedListenerRegistration.java | 62 ++--- .../messages/ListenerRegistrationMessage.java | 16 ++ .../messages/RegisterChangeListener.java | 4 +- .../RegisterDataTreeChangeListener.java | 15 +- .../cluster/datastore/AbstractShardTest.java | 4 + .../DataTreeChangeListenerProxyTest.java | 34 +++ .../DataTreeChangeListenerSupportTest.java | 2 +- .../cluster/datastore/ShardTest.java | 214 +++++++++--------- .../cluster/datastore/ShardTestKit.java | 5 +- 15 files changed, 421 insertions(+), 319 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java new file mode 100644 index 0000000000..a253b794db --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataListenerSupport.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import com.google.common.base.Optional; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EventListener; +import java.util.Map.Entry; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractDataListenerSupport, LR extends ListenerRegistration> + extends LeaderLocalDelegateFactory> { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ArrayList delayedListenerRegistrations = new ArrayList<>(); + private final ArrayList delayedListenerOnAllRegistrations = new ArrayList<>(); + private final Collection actors = new ArrayList<>(); + + protected AbstractDataListenerSupport(Shard shard) { + super(shard); + } + + @Override + void onLeadershipChange(boolean isLeader, boolean hasLeader) { + log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader); + + final EnableNotification msg = new EnableNotification(isLeader); + for(ActorSelection dataChangeListener : actors) { + dataChangeListener.tell(msg, getSelf()); + } + + if(hasLeader) { + for(D reg : delayedListenerOnAllRegistrations) { + reg.createDelegate(this); + } + + delayedListenerOnAllRegistrations.clear(); + delayedListenerOnAllRegistrations.trimToSize(); + } + + if(isLeader) { + for(D reg : delayedListenerRegistrations) { + reg.createDelegate(this); + } + + delayedListenerRegistrations.clear(); + delayedListenerRegistrations.trimToSize(); + } + } + + @Override + void onMessage(R message, boolean isLeader, boolean hasLeader) { + log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader); + + final ListenerRegistration registration; + if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) { + final Entry> res = createDelegate(message); + registration = res.getKey(); + } else { + log.debug("{}: Shard is not the leader - delaying registration", persistenceId()); + + D delayedReg = newDelayedListenerRegistration(message); + if(message.isRegisterOnAllInstances()) { + delayedListenerOnAllRegistrations.add(delayedReg); + } else { + delayedListenerRegistrations.add(delayedReg); + } + + registration = delayedReg; + } + + ActorRef registrationActor = newRegistrationActor(registration); + + log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(), + registrationActor.path()); + + tellSender(newRegistrationReplyMessage(registrationActor)); + } + + protected Logger log() { + return log; + } + + protected void addListenerActor(ActorSelection actor) { + actors.add(actor); + } + + protected abstract D newDelayedListenerRegistration(R message); + + protected abstract ActorRef newRegistrationActor(ListenerRegistration registration); + + protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor); + + protected abstract String logName(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java index 05accddb78..f4b6bcc9fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; -import java.util.ArrayList; -import java.util.List; import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; @@ -22,88 +20,16 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>>, - Optional> { - private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class); - private final List delayedListenerRegistrations = new ArrayList<>(); - private final List dataChangeListeners = new ArrayList<>(); - private final List delayedRegisterOnAllListeners = new ArrayList<>(); +final class DataChangeListenerSupport extends AbstractDataListenerSupport< + AsyncDataChangeListener>, RegisterChangeListener, + DelayedDataChangeListenerRegistration, DataChangeListenerRegistration< + AsyncDataChangeListener>>> { DataChangeListenerSupport(final Shard shard) { super(shard); } - @Override - void onLeadershipChange(final boolean isLeader, boolean hasLeader) { - LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader); - - for (ActorSelection dataChangeListener : dataChangeListeners) { - dataChangeListener.tell(new EnableNotification(isLeader), getSelf()); - } - - if(hasLeader) { - for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) { - registerDelayedListeners(reg); - } - delayedRegisterOnAllListeners.clear(); - } - - if (isLeader) { - for (DelayedListenerRegistration reg: delayedListenerRegistrations) { - registerDelayedListeners(reg); - } - - delayedListenerRegistrations.clear(); - } - } - - private void registerDelayedListeners(DelayedListenerRegistration reg) { - if(!reg.isClosed()) { - final Entry>>, - Optional> res = createDelegate(reg.getRegisterChangeListener()); - reg.setDelegate(res.getKey()); - getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue()); - } - } - - @Override - void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) { - - LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}", - persistenceId(), message.getPath(), isLeader, hasLeader); - - final ListenerRegistration>> registration; - if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) { - final Entry>>, - Optional> res = createDelegate(message); - registration = res.getKey(); - - getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue()); - } else { - LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId()); - - DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message); - if(message.isRegisterOnAllInstances()) { - delayedRegisterOnAllListeners.add(delayedReg); - } else { - delayedListenerRegistrations.add(delayedReg); - } - registration = delayedReg; - } - - ActorRef listenerRegistration = createActor(DataChangeListenerRegistrationActor.props(registration)); - - LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", - persistenceId(), listenerRegistration.path()); - - tellSender(new RegisterChangeListenerReply(listenerRegistration)); - } - @Override Entry>>, Optional> createDelegate(final RegisterChangeListener message) { @@ -117,15 +43,41 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory> listener = new DataChangeListenerProxy(dataChangeListenerPath); - LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath()); + log().debug("{}: Registering for path {}", persistenceId(), message.getPath()); - return getShard().getDataStore().registerChangeListener(message.getPath(), listener, - message.getScope()); + Entry>>, + Optional> regEntry = getShard().getDataStore().registerChangeListener( + message.getPath(), listener, message.getScope()); + + getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue()); + + return regEntry; + } + + @Override + protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) { + return new DelayedDataChangeListenerRegistration(message); + } + + @Override + protected ActorRef newRegistrationActor( + ListenerRegistration>> registration) { + return createActor(DataChangeListenerRegistrationActor.props(registration)); + } + + @Override + protected Object newRegistrationReplyMessage(ActorRef registrationActor) { + return new RegisterChangeListenerReply(registrationActor); + } + + @Override + protected String logName() { + return "registerDataChangeListener"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java index 1a27f2e4fc..a45ae52afd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java @@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChang import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -95,7 +96,8 @@ final class DataTreeChangeListenerProxy ext private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) { Future future = actorContext.executeOperationAsync(shard, - new RegisterDataTreeChangeListener(path, dataChangeListenerActor), + new RegisterDataTreeChangeListener(path, dataChangeListenerActor, + getInstance() instanceof ClusteredDOMDataTreeChangeListener), actorContext.getDatastoreContext().getShardInitializationTimeout()); future.onComplete(new OnComplete(){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java index 76458fd8ed..fa55523db0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -10,8 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.google.common.base.Optional; -import java.util.ArrayList; -import java.util.Collection; import java.util.Map.Entry; import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; @@ -19,65 +17,16 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeCh import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory, Optional> { - private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class); - private final ArrayList delayedRegistrations = new ArrayList<>(); - private final Collection actors = new ArrayList<>(); +final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport> { DataTreeChangeListenerSupport(final Shard shard) { super(shard); } @Override - void onLeadershipChange(final boolean isLeader, boolean hasLeader) { - final EnableNotification msg = new EnableNotification(isLeader); - for (ActorSelection dataChangeListener : actors) { - dataChangeListener.tell(msg, getSelf()); - } - - if (isLeader) { - for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) { - reg.createDelegate(this); - } - delayedRegistrations.clear(); - delayedRegistrations.trimToSize(); - } - } - - @Override - void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) { - LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader); - - final ListenerRegistration registration; - if (!isLeader) { - LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId()); - - DelayedDataTreeListenerRegistration delayedReg = - new DelayedDataTreeListenerRegistration(registerTreeChangeListener); - delayedRegistrations.add(delayedReg); - registration = delayedReg; - } else { - final Entry, Optional> res = - createDelegate(registerTreeChangeListener); - registration = res.getKey(); - getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(), - registration.getInstance(), res.getValue()); - } - - ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration)); - - LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", - persistenceId(), listenerRegistration.path()); - - tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration)); - } - - @Override - Entry, Optional> createDelegate(final RegisterDataTreeChangeListener message) { + Entry, Optional> createDelegate( + final RegisterDataTreeChangeListener message) { ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath()); // Notify the listener if notifications should be enabled or not @@ -87,12 +36,40 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory, Optional> regEntry = + getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener); + + getShard().getDataStore().notifyOfInitialData(message.getPath(), + regEntry.getKey().getInstance(), regEntry.getValue()); + + return regEntry; + } + + @Override + protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(RegisterDataTreeChangeListener message) { + return new DelayedDataTreeListenerRegistration(message); + } - return getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener); + @Override + protected ActorRef newRegistrationActor(ListenerRegistration registration) { + return createActor(DataTreeChangeListenerRegistrationActor.props(registration)); + } + + @Override + protected Object newRegistrationReplyMessage(ActorRef registrationActor) { + return new RegisterDataTreeChangeListenerReply(registrationActor); + } + + @Override + protected String logName() { + return "registerTreeChangeListener"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java new file mode 100644 index 0000000000..c49c3de9a8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataChangeListenerRegistration.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration< + AsyncDataChangeListener>, RegisterChangeListener> { + + DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) { + super(registerChangeListener); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java index 958ccc4438..35f7308689 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java @@ -7,54 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import java.util.Map.Entry; -import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; -import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; /** * Intermediate proxy registration returned to the user when we cannot * instantiate the registration immediately. It provides a bridge to * a real registration which may materialize at some point in the future. */ -final class DelayedDataTreeListenerRegistration implements ListenerRegistration { - private final RegisterDataTreeChangeListener registerTreeChangeListener; - private volatile ListenerRegistration delegate; - @GuardedBy("this") - private boolean closed; +final class DelayedDataTreeListenerRegistration + extends DelayedListenerRegistration { DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) { - this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener); - } - - synchronized void createDelegate(final LeaderLocalDelegateFactory, Optional> factory) { - if (!closed) { - final Entry, Optional> res = - factory.createDelegate(registerTreeChangeListener); - this.delegate = res.getKey(); - factory.getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(), - this.delegate.getInstance(), res.getValue()); - } - } - - @Override - public DOMDataTreeChangeListener getInstance() { - final ListenerRegistration d = delegate; - return d == null ? null : d.getInstance(); - } - - @Override - public synchronized void close() { - if (!closed) { - closed = true; - if (delegate != null) { - delegate.close(); - } - } + super(registerTreeChangeListener); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java index 8eb595df03..8f18cb74dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,49 +7,53 @@ */ package org.opendaylight.controller.cluster.datastore; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import com.google.common.base.Optional; +import java.util.EventListener; +import java.util.Map.Entry; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -final class DelayedListenerRegistration implements - ListenerRegistration>> { +abstract class DelayedListenerRegistration implements ListenerRegistration { + private final R registrationMessage; + private volatile ListenerRegistration delegate; - private volatile boolean closed; + @GuardedBy("this") + private boolean closed; - private final RegisterChangeListener registerChangeListener; - - private volatile ListenerRegistration>> delegate; - - DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) { - this.registerChangeListener = registerChangeListener; + protected DelayedListenerRegistration(R registrationMessage) { + this.registrationMessage = registrationMessage; } - void setDelegate( final ListenerRegistration>> registration) { - this.delegate = registration; + R getRegistrationMessage() { + return registrationMessage; } - boolean isClosed() { - return closed; + ListenerRegistration getDelegate() { + return delegate; } - RegisterChangeListener getRegisterChangeListener() { - return registerChangeListener; + synchronized > void createDelegate( + final LeaderLocalDelegateFactory> factory) { + if (!closed) { + final Entry> res = factory.createDelegate(registrationMessage); + this.delegate = res.getKey(); + } } @Override - public AsyncDataChangeListener> getInstance() { - return delegate != null ? delegate.getInstance() : null; + public L getInstance() { + final ListenerRegistration d = delegate; + return d == null ? null : (L)d.getInstance(); } @Override - public void close() { - closed = true; - if(delegate != null) { - delegate.close(); + public synchronized void close() { + if (!closed) { + closed = true; + if (delegate != null) { + delegate.close(); + } } } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java new file mode 100644 index 0000000000..5cea06b4cb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ListenerRegistrationMessage.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +public interface ListenerRegistrationMessage { + YangInstanceIdentifier getPath(); + + boolean isRegisterOnAllInstances(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java index f7a51a93ff..a3b7e12b07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java @@ -17,7 +17,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class RegisterChangeListener implements SerializableMessage { +public class RegisterChangeListener implements SerializableMessage, ListenerRegistrationMessage { public static final Class SERIALIZABLE_CLASS = ListenerRegistrationMessages.RegisterChangeListener.class; @@ -36,6 +36,7 @@ public class RegisterChangeListener implements SerializableMessage { this.registerOnAllInstances = registerOnAllInstances; } + @Override public YangInstanceIdentifier getPath() { return path; } @@ -49,6 +50,7 @@ public class RegisterChangeListener implements SerializableMessage { return dataChangeListener.path(); } + @Override public boolean isRegisterOnAllInstances() { return registerOnAllInstances; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java index 941336e630..f48811203c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java @@ -20,16 +20,20 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard * leader. */ -public final class RegisterDataTreeChangeListener implements Externalizable { +public final class RegisterDataTreeChangeListener implements Externalizable, ListenerRegistrationMessage { private static final long serialVersionUID = 1L; private ActorRef dataTreeChangeListenerPath; private YangInstanceIdentifier path; + private boolean registerOnAllInstances; - public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) { + public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath, + final boolean registerOnAllInstances) { this.path = Preconditions.checkNotNull(path); this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath); + this.registerOnAllInstances = registerOnAllInstances; } + @Override public YangInstanceIdentifier getPath() { return path; } @@ -38,15 +42,22 @@ public final class RegisterDataTreeChangeListener implements Externalizable { return dataTreeChangeListenerPath; } + @Override + public boolean isRegisterOnAllInstances() { + return registerOnAllInstances; + } + @Override public void writeExternal(final ObjectOutput out) throws IOException { out.writeObject(dataTreeChangeListenerPath); SerializationUtils.serializePath(path, out); + out.writeBoolean(registerOnAllInstances); } @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { dataTreeChangeListenerPath = (ActorRef) in.readObject(); path = SerializationUtils.deserializePath(in); + registerOnAllInstances = in.readBoolean(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 0db1cdf561..36aa9a2721 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.raft.TestActorFactory; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; @@ -69,6 +70,8 @@ public abstract class AbstractShardTest extends AbstractActorTest{ shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000). shardHeartbeatIntervalInMillis(100); + protected final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + @Before public void setUp() { InMemorySnapshotStore.clear(); @@ -79,6 +82,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ public void tearDown() { InMemorySnapshotStore.clear(); InMemoryJournal.clear(); + actorFactory.close(); } protected DatastoreContext newDatastoreContext() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java index 924f650e2c..fd1ab2bb4d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxyTest.java @@ -39,6 +39,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import scala.concurrent.ExecutionContextExecutor; @@ -74,6 +75,7 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class); Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances()); reply(new RegisterDataTreeChangeListenerReply(getRef())); @@ -101,6 +103,38 @@ public class DataTreeChangeListenerProxyTest extends AbstractActorTest { }}; } + @Test(timeout=10000) + public void testSuccessfulRegistrationForClusteredListener() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), + mock(ClusterWrapper.class), mock(Configuration.class)); + + ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(ClusteredDOMDataTreeChangeListener.class); + + final DataTreeChangeListenerProxy proxy = + new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener); + + final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME); + new Thread() { + @Override + public void run() { + proxy.init("shard-1", path); + } + + }.start(); + + FiniteDuration timeout = duration("5 seconds"); + FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class); + Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName()); + + reply(new LocalShardFound(getRef())); + + RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class); + Assert.assertEquals("getPath", path, registerMsg.getPath()); + Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances()); + }}; + } + @Test(timeout=10000) public void testLocalShardNotFound() { new JavaTestKit(getSystem()) {{ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java index c19f60968c..9baea72d8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java @@ -117,7 +117,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest { int expectedEvents, boolean isLeader) { MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents); ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener)); - support.onMessage(new RegisterDataTreeChangeListener(path, dclActor), isLeader, true); + support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true); return listener; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index f097c19e51..6c517a464f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -279,7 +279,7 @@ public class ShardTest extends AbstractShardTest { final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener), "testRegisterDataTreeChangeListener-DataTreeChangeListener"); - shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef()); + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"), RegisterDataTreeChangeListenerReply.class); @@ -347,7 +347,7 @@ public class ShardTest extends AbstractShardTest { assertEquals("Got first ElectionTimeout", true, onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef()); + shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef()); final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterDataTreeChangeListenerReply.class); assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); @@ -2517,146 +2517,150 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testClusteredDataChangeListernerDelayedRegistration() throws Exception { + public void testClusteredDataChangeListenerDelayedRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - dataStoreContextBuilder.persistent(false); - final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1); - final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1); - final Creator creator = new Creator() { - private static final long serialVersionUID = 1L; - boolean firstElectionTimeout = true; - - @Override - public Shard create() throws Exception { - return new Shard(newShardBuilder()) { - @Override - public void onReceiveCommand(final Object message) throws Exception { - if(message instanceof ElectionTimeout && firstElectionTimeout) { - firstElectionTimeout = false; - final ActorRef self = getSelf(); - new Thread() { - @Override - public void run() { - Uninterruptibles.awaitUninterruptibly( - onChangeListenerRegistered, 5, TimeUnit.SECONDS); - self.tell(message, self); - } - }.start(); - - onFirstElectionTimeout.countDown(); - } else { - super.onReceiveCommand(message); - } - } - }; - } - }; + String testName = "testClusteredDataChangeListenerDelayedRegistration"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000); final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), - "testDataChangeListenerOnFollower-DataChangeListener"); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + actorFactory.generateActorId(testName + "-DataChangeListener")); - final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()). - withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower"); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); - assertEquals("Got first ElectionTimeout", true, - onFirstElectionTimeout.await(5, TimeUnit.SECONDS)); - - shard.tell(new FindLeader(), getRef()); - final FindLeaderReply findLeadeReply = - expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor()); + waitUntilNoLeader(shard); final YangInstanceIdentifier path = TestModel.TEST_PATH; shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - onChangeListenerRegistered.countDown(); + shard.tell(new ElectionTimeout(), ActorRef.noSender()); listener.waitForChangeEvents(); - - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test - public void testClusteredDataChangeListernerRegistration() throws Exception { - dataStoreContextBuilder.persistent(false).build(); + public void testClusteredDataChangeListenerRegistration() throws Exception { new ShardTestKit(getSystem()) {{ - final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); - - final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2") - .shardName("inventory").type("config").build(); - final Creator followerShardCreator = new Creator() { - private static final long serialVersionUID = 1L; + String testName = "testClusteredDataChangeListenerRegistration"; + final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build(); + + final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build(); + + final TestActorRef followerShard = actorFactory.createTestActor( + Shard.builder().id(followerShardID). + datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()). + peerAddresses(Collections.singletonMap(leaderShardID.toString(), + "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); + + final TestActorRef leaderShard = actorFactory.createTestActor( + Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(followerShardID.toString(), + "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); + + leaderShard.tell(new ElectionTimeout(), ActorRef.noSender()); + String leaderPath = waitUntilLeader(followerShard); + assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); - @Override - public Shard create() throws Exception { - return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()). - peerAddresses(Collections.singletonMap(member2ShardID.toString(), - "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) { - @Override - public void onReceiveCommand(final Object message) throws Exception { + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataChangeListener listener = new MockDataChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener), + actorFactory.generateActorId(testName + "-DataChangeListener")); - if(!(message instanceof ElectionTimeout)) { - super.onReceiveCommand(message); - } - } - }; - } - }; + followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); + final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterChangeListenerReply.class); + assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); - final Creator leaderShardCreator = new Creator() { - private static final long serialVersionUID = 1L; + writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - @Override - public Shard create() throws Exception { - return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()). - peerAddresses(Collections.singletonMap(member1ShardID.toString(), - "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {}; - } - }; + listener.waitForChangeEvents(); + }}; + } + @Test + public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + String testName = "testClusteredDataTreeChangeListenerDelayedRegistration"; + dataStoreContextBuilder.shardElectionTimeoutFactor(1000); - final TestActorRef shard = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(followerShardCreator)), - member1ShardID.toString()); + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + actorFactory.generateActorId(testName + "-DataTreeChangeListener")); - final TestActorRef shardLeader = TestActorRef.create(getSystem(), - Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()), - member2ShardID.toString()); - // Sleep to let election happen - Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + final TestActorRef shard = actorFactory.createTestActor( + newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()), + actorFactory.generateActorId(testName + "-shard")); - shard.tell(new FindLeader(), getRef()); - final FindLeaderReply findLeaderReply = - expectMsgClass(duration("5 seconds"), FindLeaderReply.class); - assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor()); + waitUntilNoLeader(shard); final YangInstanceIdentifier path = TestModel.TEST_PATH; - final MockDataChangeListener listener = new MockDataChangeListener(1); - final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener), - "testDataChangeListenerOnFollower-DataChangeListener"); - shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef()); - final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"), - RegisterChangeListenerReply.class); - assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath()); + shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + shard.tell(new ElectionTimeout(), ActorRef.noSender()); + listener.waitForChangeEvents(); + }}; + } - dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + @Test + public void testClusteredDataTreeChangeListenerRegistration() throws Exception { + new ShardTestKit(getSystem()) {{ + String testName = "testClusteredDataTreeChangeListenerRegistration"; + final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build(); + + final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName( + actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build(); + + final TestActorRef followerShard = actorFactory.createTestActor( + Shard.builder().id(followerShardID). + datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()). + peerAddresses(Collections.singletonMap(leaderShardID.toString(), + "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString()); + + final TestActorRef leaderShard = actorFactory.createTestActor( + Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()). + peerAddresses(Collections.singletonMap(followerShardID.toString(), + "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props(). + withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString()); + + leaderShard.tell(new ElectionTimeout(), ActorRef.noSender()); + String leaderPath = waitUntilLeader(followerShard); + assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath); + + final YangInstanceIdentifier path = TestModel.TEST_PATH; + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener), + actorFactory.generateActorId(testName + "-DataTreeChangeListener")); + + followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef()); + final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"), + RegisterDataTreeChangeListenerReply.class); + assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath()); + + writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + listener.waitForChangeEvents(); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index 281a190e94..ae607b9b3d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -46,14 +46,14 @@ public class ShardTestKit extends JavaTestKit { } - public void waitUntilLeader(ActorRef shard) { + public String waitUntilLeader(ActorRef shard) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(shard, new FindLeader(), new Timeout(duration)); try { FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration); if(resp.getLeaderActor() != null) { - return; + return resp.getLeaderActor(); } } catch(TimeoutException e) { } catch(Exception e) { @@ -66,6 +66,7 @@ public class ShardTestKit extends JavaTestKit { } Assert.fail("Leader not found for shard " + shard.path()); + return null; } public void waitUntilNoLeader(ActorRef shard) { -- 2.36.6