From 66d39ecc3effd52c96c7a772a46612008e34fbc9 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 17 Mar 2015 11:18:53 +0100 Subject: [PATCH] BUG-2673: make CDS implement DOMDataTreeChangeListener This patch adds the base support for registering DOMDataTreeChangeListeners. These are delivered only on the local node, as efficient serialization requires interaction with the cluster topology and shard leadership handoff. That functionality will be delivered in a follow-up patch. It also introduces a bit common infrastructure to be used by DataChangeListener code, as it performs a very similar function. Change-Id: Ifb91d08857684fb160fd923bc25c294d2fca4bc3 Signed-off-by: Robert Varga --- .../NormalizedNodeOutputStreamWriter.java | 2 +- .../DataTreeChangeListenerActor.java | 89 ++++++++++++++ .../DataTreeChangeListenerProxy.java | 114 ++++++++++++++++++ ...taTreeChangeListenerRegistrationActor.java | 57 +++++++++ .../DataTreeChangeListenerSupport.java | 94 +++++++++++++++ .../DelayedDataTreeListenerRegistration.java | 53 ++++++++ .../cluster/datastore/DelegateFactory.java | 18 +++ .../datastore/DistributedDataStore.java | 19 ++- .../ForwardingDataTreeChangeListener.java | 34 ++++++ .../datastore/LeaderLocalDelegateFactory.java | 26 ++++ .../controller/cluster/datastore/Shard.java | 9 +- ...oseDataTreeChangeListenerRegistration.java | 27 +++++ ...taTreeChangeListenerRegistrationReply.java | 28 +++++ .../datastore/messages/DataTreeChanged.java | 35 ++++++ .../messages/DataTreeChangedReply.java | 28 +++++ .../RegisterDataTreeChangeListener.java | 52 ++++++++ .../RegisterDataTreeChangeListenerReply.java | 28 +++++ 17 files changed, 710 insertions(+), 3 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java index d4aab036be..055ccfe0ce 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java @@ -66,7 +66,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri output = new DataOutputStream(stream); } - public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException { + public NormalizedNodeOutputStreamWriter(DataOutput output) { this.output = Preconditions.checkNotNull(output); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java new file mode 100644 index 0000000000..3f11909117 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import akka.japi.Creator; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating + * DataTreeChanged messages and dispatching their context to the user. + */ +final class DataTreeChangeListenerActor extends AbstractUntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class); + private final DOMDataTreeChangeListener listener; + private boolean notificationsEnabled = false; + + private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) { + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + protected void handleReceive(final Object message) { + if (message instanceof DataTreeChanged) { + dataChanged((DataTreeChanged)message); + } else if (message instanceof EnableNotification) { + enableNotification((EnableNotification) message); + } + } + + private void dataChanged(final DataTreeChanged message) { + // Do nothing if notifications are not enabled + if (!notificationsEnabled) { + LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener); + return; + } + + LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener); + + try { + this.listener.onDataTreeChanged(message.getChanges()); + } catch (Exception e) { + LOG.error("Error notifying listener {}", this.listener, e); + } + + // TODO: do we really need this? + // It seems the sender is never null but it doesn't hurt to check. If the caller passes in + // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor. + if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(DataTreeChangedReply.getInstance(), getSelf()); + } + } + + private void enableNotification(final EnableNotification message) { + notificationsEnabled = message.isEnabled(); + LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"), + listener); + } + + public static Props props(final DOMDataTreeChangeListener listener) { + return Props.create(new DataTreeChangeListenerCreator(listener)); + } + + private static final class DataTreeChangeListenerCreator implements Creator { + private static final long serialVersionUID = 1L; + private final DOMDataTreeChangeListener listener; + + DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) { + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + public DataTreeChangeListenerActor create() { + return new DataTreeChangeListenerActor(listener); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java new file mode 100644 index 0000000000..124724b9c2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import com.google.common.base.Preconditions; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * Proxy class for holding required state to lazily instantiate a listener registration with an + * asynchronously-discovered actor. + * + * @param listener type + */ +final class DataTreeChangeListenerProxy extends AbstractListenerRegistration { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class); + private final ActorRef dataChangeListenerActor; + private final ActorContext actorContext; + + @GuardedBy("this") + private ActorSelection listenerRegistrationActor; + + public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) { + super(listener); + this.actorContext = Preconditions.checkNotNull(actorContext); + this.dataChangeListenerActor = actorContext.getActorSystem().actorOf( + DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath())); + } + + @Override + protected synchronized void removeRegistration() { + if (listenerRegistrationActor != null) { + listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender()); + listenerRegistrationActor = null; + } + + dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + void init(final String shardName, final YangInstanceIdentifier treeId) { + Future findFuture = actorContext.findLocalShardAsync(shardName); + findFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef shard) { + if (failure instanceof LocalShardNotFoundException) { + LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " + + "cannot be registered", shardName, getInstance(), treeId); + } else if (failure != null) { + LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " + + "cannot be registered: {}", shardName, getInstance(), treeId, failure); + } else { + doRegistration(shard, treeId); + } + } + }, actorContext.getClientDispatcher()); + } + + private void setListenerRegistrationActor(final ActorSelection actor) { + if (actor == null) { + LOG.debug("Ignoring null actor on {}", this); + return; + } + + synchronized (this) { + if (!isClosed()) { + this.listenerRegistrationActor = actor; + return; + } + } + + // This registration has already been closed, notify the actor + actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null); + } + + private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) { + + Future future = actorContext.executeOperationAsync(shard, + new RegisterDataTreeChangeListener(path, dataChangeListenerActor), + actorContext.getDatastoreContext().getShardInitializationTimeout()); + + future.onComplete(new OnComplete(){ + @Override + public void onComplete(final Throwable failure, final Object result) { + if (failure != null) { + LOG.error("Failed to register DataTreeChangeListener {} at path {}", + getInstance(), path.toString(), failure); + } else { + RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result; + setListenerRegistrationActor(actorContext.actorSelection( + reply.getListenerRegistrationPath().path())); + } + } + }, actorContext.getClientDispatcher()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java new file mode 100644 index 0000000000..7d0117f8e1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.japi.Creator; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +/** + * Actor co-located with a shard. It exists only to terminate the registration when + * asked to do so via {@link CloseDataTreeChangeListenerRegistration}. + */ +public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor { + private final ListenerRegistration registration; + + public DataTreeChangeListenerRegistrationActor(final ListenerRegistration registration) { + this.registration = Preconditions.checkNotNull(registration); + } + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof CloseDataTreeChangeListenerRegistration) { + registration.close(); + getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + public static Props props(final ListenerRegistration registration) { + return Props.create(new DataTreeChangeListenerRegistrationCreator(registration)); + } + + private static final class DataTreeChangeListenerRegistrationCreator implements Creator { + private static final long serialVersionUID = 1L; + final ListenerRegistration registration; + + DataTreeChangeListenerRegistrationCreator(ListenerRegistration registration) { + this.registration = Preconditions.checkNotNull(registration); + } + + @Override + public DataTreeChangeListenerRegistrationActor create() { + return new DataTreeChangeListenerRegistrationActor(registration); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java new file mode 100644 index 0000000000..afce4df546 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory> { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class); + private final ArrayList delayedRegistrations = new ArrayList<>(); + private final Collection actors = new ArrayList<>(); + private final Shard shard; + + DataTreeChangeListenerSupport(final Shard shard) { + this.shard = Preconditions.checkNotNull(shard); + } + + @Override + void onLeadershipChange(final boolean isLeader) { + if (isLeader) { + for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) { + reg.createDelegate(this); + } + delayedRegistrations.clear(); + delayedRegistrations.trimToSize(); + } + + final EnableNotification msg = new EnableNotification(isLeader); + for (ActorSelection dataChangeListener : actors) { + dataChangeListener.tell(msg, shard.getSelf()); + } + } + + @Override + void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) { + LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", shard.persistenceId(), registerTreeChangeListener.getPath(), isLeader); + + final ListenerRegistration registration; + if (!isLeader) { + LOG.debug("{}: Shard is not the leader - delaying registration", shard.persistenceId()); + + DelayedDataTreeListenerRegistration delayedReg = + new DelayedDataTreeListenerRegistration(registerTreeChangeListener); + delayedRegistrations.add(delayedReg); + registration = delayedReg; + } else { + registration = createDelegate(registerTreeChangeListener); + } + + ActorRef listenerRegistration = shard.getContext().actorOf( + DataTreeChangeListenerRegistrationActor.props(registration)); + + LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", + shard.persistenceId(), listenerRegistration.path()); + + shard.getSender().tell(new RegisterDataTreeChangeListenerReply(listenerRegistration), shard.getSelf()); + } + + @Override + ListenerRegistration createDelegate(final RegisterDataTreeChangeListener message) { + ActorSelection dataChangeListenerPath = shard.getContext().system().actorSelection( + message.getDataTreeChangeListenerPath().path()); + + // Notify the listener if notifications should be enabled or not + // If this shard is the leader then it will enable notifications else + // it will not + dataChangeListenerPath.tell(new EnableNotification(true), shard.getSelf()); + + // Now store a reference to the data change listener so it can be notified + // at a later point if notifications should be enabled or disabled + actors.add(dataChangeListenerPath); + + DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath); + + LOG.debug("{}: Registering for path {}", shard.persistenceId(), message.getPath()); + + return shard.getDataStore().registerTreeChangeListener(message.getPath(), listener); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java new file mode 100644 index 0000000000..b3ae8a3ca2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +/** + * Intermediate proxy registration returned to the user when we cannot + * instantiate the registration immediately. It provides a bridge to + * a real registration which may materialize at some point in the future. + */ +final class DelayedDataTreeListenerRegistration implements ListenerRegistration { + private final RegisterDataTreeChangeListener registerTreeChangeListener; + private volatile ListenerRegistration delegate; + @GuardedBy("this") + private boolean closed; + + DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) { + this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener); + } + + synchronized void createDelegate(final DelegateFactory> factory) { + if (!closed) { + this.delegate = factory.createDelegate(registerTreeChangeListener); + } + } + + @Override + public DOMDataTreeChangeListener getInstance() { + final ListenerRegistration d = delegate; + return d == null ? null : d.getInstance(); + } + + @Override + public synchronized void close() { + if (!closed) { + closed = true; + if (delegate != null) { + delegate.close(); + } + } + } +} + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java new file mode 100644 index 0000000000..e6702d90f1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +/** + * Base class for factories instantiating delegates. + * + * delegate type + * message type + */ +abstract class DelegateFactory { + abstract D createDelegate(M message); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index c79de94567..69c127f289 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -21,10 +21,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory; * */ public class DistributedDataStore implements DOMStore, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, AutoCloseable { + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private static final String UNKNOWN_TYPE = "unknown"; @@ -125,6 +127,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, return listenerRegistrationProxy; } + @Override + public ListenerRegistration registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) { + Preconditions.checkNotNull(treeId, "treeId should not be null"); + Preconditions.checkNotNull(listener, "listener should not be null"); + + final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId); + LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); + + final DataTreeChangeListenerProxy listenerRegistrationProxy = + new DataTreeChangeListenerProxy(actorContext, listener); + listenerRegistrationProxy.init(shardName, treeId); + + return listenerRegistrationProxy; + } + @Override public DOMStoreTransactionChain createTransactionChain() { return new TransactionChainProxy(actorContext); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java new file mode 100644 index 0000000000..7ca21d4d8a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import java.util.Collection; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Internal implementation of a {@link DOMDataTreeChangeListener} which + * encapsulates received notifications into a {@link DataTreeChanged} + * message and forwards them towards the client's {@link DataTreeChangeListenerActor}. + */ +final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener { + private final ActorSelection actor; + + ForwardingDataTreeChangeListener(final ActorSelection actor) { + this.actor = Preconditions.checkNotNull(actor, "actor should not be null"); + } + + @Override + public void onDataTreeChanged(Collection changes) { + actor.tell(new DataTreeChanged(changes), ActorRef.noSender()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java new file mode 100644 index 0000000000..891c0bf6d4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +/** + * Base class for factories instantiating delegates which are local to the + * shard leader. + * + * delegate type + * message type + */ +abstract class LeaderLocalDelegateFactory extends DelegateFactory { + /** + * Invoked whenever the local shard's leadership role changes. + * + * @param isLeader true if the shard has become leader, false if it has + * become a follower. + */ + abstract void onLeadershipChange(boolean isLeader); + abstract void onMessage(M message, boolean isLeader); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c04256a28e..6868cc15cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; @@ -141,6 +142,8 @@ public class Shard extends RaftActor { private final String txnDispatcherPath; + private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); @@ -158,7 +161,7 @@ public class Shard extends RaftActor { store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); - if(schemaContext != null) { + if (schemaContext != null) { store.onGlobalContextUpdated(schemaContext); } @@ -272,6 +275,8 @@ public class Shard extends RaftActor { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); + } else if (message instanceof RegisterDataTreeChangeListener) { + treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader()); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); } else if (message instanceof PeerAddressResolved) { @@ -826,6 +831,8 @@ public class Shard extends RaftActor { @Override protected void onStateChanged() { boolean isLeader = isLeader(); + treeChangeSupport.onLeadershipChange(isLeader); + for (ActorSelection dataChangeListener : dataChangeListeners) { dataChangeListener.tell(new EnableNotification(isLeader), getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java new file mode 100644 index 0000000000..032f4c10b0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +public final class CloseDataTreeChangeListenerRegistration implements Serializable { + private static final long serialVersionUID = 1L; + private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration(); + + private CloseDataTreeChangeListenerRegistration() { + } + + public static CloseDataTreeChangeListenerRegistration getInstance() { + return INSTANCE; + } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java new file mode 100644 index 0000000000..9d83fac27c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable { + private static final long serialVersionUID = 1L; + private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply(); + + private CloseDataTreeChangeListenerRegistrationReply() { + // Use getInstance() instead + } + + public static CloseDataTreeChangeListenerRegistrationReply getInstance() { + return INSTANCE; + } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java new file mode 100644 index 0000000000..919f9448a9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * A message about a DataTree having been changed. The message is not + * serializable on purpose. For delegating the change across cluster nodes, + * this needs to be intercepted by a local agent and forwarded as + * a {@link DataTreeDelta}. + */ +public final class DataTreeChanged { + private final Collection changes; + + public DataTreeChanged(final Collection changes) { + this.changes = Preconditions.checkNotNull(changes); + } + + /** + * Return the data changes. + * + * @return Change events + */ + public Collection getChanges() { + return changes; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java new file mode 100644 index 0000000000..e4a8d74a7d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +public final class DataTreeChangedReply implements Serializable { + private static final long serialVersionUID = 1L; + private static final DataTreeChangedReply INSTANCE = new DataTreeChangedReply(); + + private DataTreeChangedReply() { + // Use getInstance() instead + } + + public static DataTreeChangedReply getInstance() { + return INSTANCE; + } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java new file mode 100644 index 0000000000..941336e630 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard + * leader. + */ +public final class RegisterDataTreeChangeListener implements Externalizable { + private static final long serialVersionUID = 1L; + private ActorRef dataTreeChangeListenerPath; + private YangInstanceIdentifier path; + + public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) { + this.path = Preconditions.checkNotNull(path); + this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath); + } + + public YangInstanceIdentifier getPath() { + return path; + } + + public ActorRef getDataTreeChangeListenerPath() { + return dataTreeChangeListenerPath; + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeObject(dataTreeChangeListenerPath); + SerializationUtils.serializePath(path, out); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + dataTreeChangeListenerPath = (ActorRef) in.readObject(); + path = SerializationUtils.deserializePath(in); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java new file mode 100644 index 0000000000..88682ae7f3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.io.Serializable; + +/** + * Successful reply to a {@link RegisterDataTreeChangeListener} request. + */ +public final class RegisterDataTreeChangeListenerReply implements Serializable { + private static final long serialVersionUID = 1L; + private final ActorRef listenerRegistrationPath; + + public RegisterDataTreeChangeListenerReply(final ActorRef listenerRegistrationPath) { + this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath); + } + + public ActorRef getListenerRegistrationPath() { + return listenerRegistrationPath; + } +} -- 2.36.6