From: Tom Pantelis Date: Tue, 31 Mar 2015 17:27:52 +0000 (+0000) Subject: Merge "BUG-2673: make CDS implement DOMDataTreeChangeListener" X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8f0395b38dbfdf6b3164cb68b1cba651b1075a07;hp=52da008f461eb10786758eb45af16c1327d5b974 Merge "BUG-2673: make CDS implement DOMDataTreeChangeListener" --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java index d4aab036be..055ccfe0ce 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java @@ -66,7 +66,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri output = new DataOutputStream(stream); } - public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException { + public NormalizedNodeOutputStreamWriter(DataOutput output) { this.output = Preconditions.checkNotNull(output); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java new file mode 100644 index 0000000000..3f11909117 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import akka.japi.Creator; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating + * DataTreeChanged messages and dispatching their context to the user. + */ +final class DataTreeChangeListenerActor extends AbstractUntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class); + private final DOMDataTreeChangeListener listener; + private boolean notificationsEnabled = false; + + private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) { + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + protected void handleReceive(final Object message) { + if (message instanceof DataTreeChanged) { + dataChanged((DataTreeChanged)message); + } else if (message instanceof EnableNotification) { + enableNotification((EnableNotification) message); + } + } + + private void dataChanged(final DataTreeChanged message) { + // Do nothing if notifications are not enabled + if (!notificationsEnabled) { + LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener); + return; + } + + LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener); + + try { + this.listener.onDataTreeChanged(message.getChanges()); + } catch (Exception e) { + LOG.error("Error notifying listener {}", this.listener, e); + } + + // TODO: do we really need this? + // It seems the sender is never null but it doesn't hurt to check. If the caller passes in + // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor. + if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(DataTreeChangedReply.getInstance(), getSelf()); + } + } + + private void enableNotification(final EnableNotification message) { + notificationsEnabled = message.isEnabled(); + LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"), + listener); + } + + public static Props props(final DOMDataTreeChangeListener listener) { + return Props.create(new DataTreeChangeListenerCreator(listener)); + } + + private static final class DataTreeChangeListenerCreator implements Creator { + private static final long serialVersionUID = 1L; + private final DOMDataTreeChangeListener listener; + + DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) { + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + public DataTreeChangeListenerActor create() { + return new DataTreeChangeListenerActor(listener); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java new file mode 100644 index 0000000000..124724b9c2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.dispatch.OnComplete; +import com.google.common.base.Preconditions; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; + +/** + * Proxy class for holding required state to lazily instantiate a listener registration with an + * asynchronously-discovered actor. + * + * @param listener type + */ +final class DataTreeChangeListenerProxy extends AbstractListenerRegistration { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class); + private final ActorRef dataChangeListenerActor; + private final ActorContext actorContext; + + @GuardedBy("this") + private ActorSelection listenerRegistrationActor; + + public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) { + super(listener); + this.actorContext = Preconditions.checkNotNull(actorContext); + this.dataChangeListenerActor = actorContext.getActorSystem().actorOf( + DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath())); + } + + @Override + protected synchronized void removeRegistration() { + if (listenerRegistrationActor != null) { + listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender()); + listenerRegistrationActor = null; + } + + dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + void init(final String shardName, final YangInstanceIdentifier treeId) { + Future findFuture = actorContext.findLocalShardAsync(shardName); + findFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final ActorRef shard) { + if (failure instanceof LocalShardNotFoundException) { + LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " + + "cannot be registered", shardName, getInstance(), treeId); + } else if (failure != null) { + LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " + + "cannot be registered: {}", shardName, getInstance(), treeId, failure); + } else { + doRegistration(shard, treeId); + } + } + }, actorContext.getClientDispatcher()); + } + + private void setListenerRegistrationActor(final ActorSelection actor) { + if (actor == null) { + LOG.debug("Ignoring null actor on {}", this); + return; + } + + synchronized (this) { + if (!isClosed()) { + this.listenerRegistrationActor = actor; + return; + } + } + + // This registration has already been closed, notify the actor + actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null); + } + + private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) { + + Future future = actorContext.executeOperationAsync(shard, + new RegisterDataTreeChangeListener(path, dataChangeListenerActor), + actorContext.getDatastoreContext().getShardInitializationTimeout()); + + future.onComplete(new OnComplete(){ + @Override + public void onComplete(final Throwable failure, final Object result) { + if (failure != null) { + LOG.error("Failed to register DataTreeChangeListener {} at path {}", + getInstance(), path.toString(), failure); + } else { + RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result; + setListenerRegistrationActor(actorContext.actorSelection( + reply.getListenerRegistrationPath().path())); + } + } + }, actorContext.getClientDispatcher()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java new file mode 100644 index 0000000000..7d0117f8e1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.japi.Creator; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration; +import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +/** + * Actor co-located with a shard. It exists only to terminate the registration when + * asked to do so via {@link CloseDataTreeChangeListenerRegistration}. + */ +public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor { + private final ListenerRegistration registration; + + public DataTreeChangeListenerRegistrationActor(final ListenerRegistration registration) { + this.registration = Preconditions.checkNotNull(registration); + } + + @Override + protected void handleReceive(Object message) throws Exception { + if (message instanceof CloseDataTreeChangeListenerRegistration) { + registration.close(); + getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf()); + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } + } + + public static Props props(final ListenerRegistration registration) { + return Props.create(new DataTreeChangeListenerRegistrationCreator(registration)); + } + + private static final class DataTreeChangeListenerRegistrationCreator implements Creator { + private static final long serialVersionUID = 1L; + final ListenerRegistration registration; + + DataTreeChangeListenerRegistrationCreator(ListenerRegistration registration) { + this.registration = Preconditions.checkNotNull(registration); + } + + @Override + public DataTreeChangeListenerRegistrationActor create() { + return new DataTreeChangeListenerRegistrationActor(registration); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java new file mode 100644 index 0000000000..afce4df546 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collection; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory> { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class); + private final ArrayList delayedRegistrations = new ArrayList<>(); + private final Collection actors = new ArrayList<>(); + private final Shard shard; + + DataTreeChangeListenerSupport(final Shard shard) { + this.shard = Preconditions.checkNotNull(shard); + } + + @Override + void onLeadershipChange(final boolean isLeader) { + if (isLeader) { + for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) { + reg.createDelegate(this); + } + delayedRegistrations.clear(); + delayedRegistrations.trimToSize(); + } + + final EnableNotification msg = new EnableNotification(isLeader); + for (ActorSelection dataChangeListener : actors) { + dataChangeListener.tell(msg, shard.getSelf()); + } + } + + @Override + void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) { + LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", shard.persistenceId(), registerTreeChangeListener.getPath(), isLeader); + + final ListenerRegistration registration; + if (!isLeader) { + LOG.debug("{}: Shard is not the leader - delaying registration", shard.persistenceId()); + + DelayedDataTreeListenerRegistration delayedReg = + new DelayedDataTreeListenerRegistration(registerTreeChangeListener); + delayedRegistrations.add(delayedReg); + registration = delayedReg; + } else { + registration = createDelegate(registerTreeChangeListener); + } + + ActorRef listenerRegistration = shard.getContext().actorOf( + DataTreeChangeListenerRegistrationActor.props(registration)); + + LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", + shard.persistenceId(), listenerRegistration.path()); + + shard.getSender().tell(new RegisterDataTreeChangeListenerReply(listenerRegistration), shard.getSelf()); + } + + @Override + ListenerRegistration createDelegate(final RegisterDataTreeChangeListener message) { + ActorSelection dataChangeListenerPath = shard.getContext().system().actorSelection( + message.getDataTreeChangeListenerPath().path()); + + // Notify the listener if notifications should be enabled or not + // If this shard is the leader then it will enable notifications else + // it will not + dataChangeListenerPath.tell(new EnableNotification(true), shard.getSelf()); + + // Now store a reference to the data change listener so it can be notified + // at a later point if notifications should be enabled or disabled + actors.add(dataChangeListenerPath); + + DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath); + + LOG.debug("{}: Registering for path {}", shard.persistenceId(), message.getPath()); + + return shard.getDataStore().registerTreeChangeListener(message.getPath(), listener); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java new file mode 100644 index 0000000000..b3ae8a3ca2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; + +/** + * Intermediate proxy registration returned to the user when we cannot + * instantiate the registration immediately. It provides a bridge to + * a real registration which may materialize at some point in the future. + */ +final class DelayedDataTreeListenerRegistration implements ListenerRegistration { + private final RegisterDataTreeChangeListener registerTreeChangeListener; + private volatile ListenerRegistration delegate; + @GuardedBy("this") + private boolean closed; + + DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) { + this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener); + } + + synchronized void createDelegate(final DelegateFactory> factory) { + if (!closed) { + this.delegate = factory.createDelegate(registerTreeChangeListener); + } + } + + @Override + public DOMDataTreeChangeListener getInstance() { + final ListenerRegistration d = delegate; + return d == null ? null : d.getInstance(); + } + + @Override + public synchronized void close() { + if (!closed) { + closed = true; + if (delegate != null) { + delegate.close(); + } + } + } +} + diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java new file mode 100644 index 0000000000..e6702d90f1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +/** + * Base class for factories instantiating delegates. + * + * delegate type + * message type + */ +abstract class DelegateFactory { + abstract D createDelegate(M message); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index c79de94567..69c127f289 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -21,10 +21,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory; * */ public class DistributedDataStore implements DOMStore, SchemaContextListener, - DatastoreContextConfigAdminOverlay.Listener, AutoCloseable { + DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); private static final String UNKNOWN_TYPE = "unknown"; @@ -125,6 +127,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, return listenerRegistrationProxy; } + @Override + public ListenerRegistration registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) { + Preconditions.checkNotNull(treeId, "treeId should not be null"); + Preconditions.checkNotNull(listener, "listener should not be null"); + + final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId); + LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName); + + final DataTreeChangeListenerProxy listenerRegistrationProxy = + new DataTreeChangeListenerProxy(actorContext, listener); + listenerRegistrationProxy.init(shardName, treeId); + + return listenerRegistrationProxy; + } + @Override public DOMStoreTransactionChain createTransactionChain() { return new TransactionChainProxy(actorContext); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java new file mode 100644 index 0000000000..7ca21d4d8a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import java.util.Collection; +import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Internal implementation of a {@link DOMDataTreeChangeListener} which + * encapsulates received notifications into a {@link DataTreeChanged} + * message and forwards them towards the client's {@link DataTreeChangeListenerActor}. + */ +final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener { + private final ActorSelection actor; + + ForwardingDataTreeChangeListener(final ActorSelection actor) { + this.actor = Preconditions.checkNotNull(actor, "actor should not be null"); + } + + @Override + public void onDataTreeChanged(Collection changes) { + actor.tell(new DataTreeChanged(changes), ActorRef.noSender()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java new file mode 100644 index 0000000000..891c0bf6d4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +/** + * Base class for factories instantiating delegates which are local to the + * shard leader. + * + * delegate type + * message type + */ +abstract class LeaderLocalDelegateFactory extends DelegateFactory { + /** + * Invoked whenever the local shard's leadership role changes. + * + * @param isLeader true if the shard has become leader, false if it has + * become a follower. + */ + abstract void onLeadershipChange(boolean isLeader); + abstract void onMessage(M message, boolean isLeader); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c04256a28e..6868cc15cd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; @@ -141,6 +142,8 @@ public class Shard extends RaftActor { private final String txnDispatcherPath; + private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); + protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig())); @@ -158,7 +161,7 @@ public class Shard extends RaftActor { store = InMemoryDOMDataStoreFactory.create(name.toString(), null, datastoreContext.getDataStoreProperties()); - if(schemaContext != null) { + if (schemaContext != null) { store.onGlobalContextUpdated(schemaContext); } @@ -272,6 +275,8 @@ public class Shard extends RaftActor { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { registerChangeListener((RegisterChangeListener) message); + } else if (message instanceof RegisterDataTreeChangeListener) { + treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader()); } else if (message instanceof UpdateSchemaContext) { updateSchemaContext((UpdateSchemaContext) message); } else if (message instanceof PeerAddressResolved) { @@ -826,6 +831,8 @@ public class Shard extends RaftActor { @Override protected void onStateChanged() { boolean isLeader = isLeader(); + treeChangeSupport.onLeadershipChange(isLeader); + for (ActorSelection dataChangeListener : dataChangeListeners) { dataChangeListener.tell(new EnableNotification(isLeader), getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java new file mode 100644 index 0000000000..032f4c10b0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +public final class CloseDataTreeChangeListenerRegistration implements Serializable { + private static final long serialVersionUID = 1L; + private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration(); + + private CloseDataTreeChangeListenerRegistration() { + } + + public static CloseDataTreeChangeListenerRegistration getInstance() { + return INSTANCE; + } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java new file mode 100644 index 0000000000..9d83fac27c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable { + private static final long serialVersionUID = 1L; + private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply(); + + private CloseDataTreeChangeListenerRegistrationReply() { + // Use getInstance() instead + } + + public static CloseDataTreeChangeListenerRegistrationReply getInstance() { + return INSTANCE; + } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java new file mode 100644 index 0000000000..919f9448a9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * A message about a DataTree having been changed. The message is not + * serializable on purpose. For delegating the change across cluster nodes, + * this needs to be intercepted by a local agent and forwarded as + * a {@link DataTreeDelta}. + */ +public final class DataTreeChanged { + private final Collection changes; + + public DataTreeChanged(final Collection changes) { + this.changes = Preconditions.checkNotNull(changes); + } + + /** + * Return the data changes. + * + * @return Change events + */ + public Collection getChanges() { + return changes; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java new file mode 100644 index 0000000000..e4a8d74a7d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.ObjectStreamException; +import java.io.Serializable; + +public final class DataTreeChangedReply implements Serializable { + private static final long serialVersionUID = 1L; + private static final DataTreeChangedReply INSTANCE = new DataTreeChangedReply(); + + private DataTreeChangedReply() { + // Use getInstance() instead + } + + public static DataTreeChangedReply getInstance() { + return INSTANCE; + } + + private Object readResolve() throws ObjectStreamException { + return INSTANCE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java new file mode 100644 index 0000000000..941336e630 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard + * leader. + */ +public final class RegisterDataTreeChangeListener implements Externalizable { + private static final long serialVersionUID = 1L; + private ActorRef dataTreeChangeListenerPath; + private YangInstanceIdentifier path; + + public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) { + this.path = Preconditions.checkNotNull(path); + this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath); + } + + public YangInstanceIdentifier getPath() { + return path; + } + + public ActorRef getDataTreeChangeListenerPath() { + return dataTreeChangeListenerPath; + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeObject(dataTreeChangeListenerPath); + SerializationUtils.serializePath(path, out); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + dataTreeChangeListenerPath = (ActorRef) in.readObject(); + path = SerializationUtils.deserializePath(in); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java new file mode 100644 index 0000000000..88682ae7f3 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.io.Serializable; + +/** + * Successful reply to a {@link RegisterDataTreeChangeListener} request. + */ +public final class RegisterDataTreeChangeListenerReply implements Serializable { + private static final long serialVersionUID = 1L; + private final ActorRef listenerRegistrationPath; + + public RegisterDataTreeChangeListenerReply(final ActorRef listenerRegistrationPath) { + this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath); + } + + public ActorRef getListenerRegistrationPath() { + return listenerRegistrationPath; + } +}