Merge "BUG-2673: make CDS implement DOMDataTreeChangeListener"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 31 Mar 2015 17:27:52 +0000 (17:27 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 31 Mar 2015 17:27:53 +0000 (17:27 +0000)
17 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java [new file with mode: 0644]

index d4aab03..055ccfe 100644 (file)
@@ -66,7 +66,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
         output = new DataOutputStream(stream);
     }
 
-    public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException {
+    public NormalizedNodeOutputStreamWriter(DataOutput output) {
         this.output = Preconditions.checkNotNull(output);
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java
new file mode 100644 (file)
index 0000000..3f11909
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
+ * DataTreeChanged messages and dispatching their context to the user.
+ */
+final class DataTreeChangeListenerActor extends AbstractUntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
+    private final DOMDataTreeChangeListener listener;
+    private boolean notificationsEnabled = false;
+
+    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+        this.listener = Preconditions.checkNotNull(listener);
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        if (message instanceof DataTreeChanged) {
+            dataChanged((DataTreeChanged)message);
+        } else if (message instanceof EnableNotification) {
+            enableNotification((EnableNotification) message);
+        }
+    }
+
+    private void dataChanged(final DataTreeChanged message) {
+        // Do nothing if notifications are not enabled
+        if (!notificationsEnabled) {
+            LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
+            return;
+        }
+
+        LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener);
+
+        try {
+            this.listener.onDataTreeChanged(message.getChanges());
+        } catch (Exception e) {
+            LOG.error("Error notifying listener {}", this.listener, e);
+        }
+
+        // TODO: do we really need this?
+        // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
+        // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(DataTreeChangedReply.getInstance(), getSelf());
+        }
+    }
+
+    private void enableNotification(final EnableNotification message) {
+        notificationsEnabled = message.isEnabled();
+        LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+                listener);
+    }
+
+    public static Props props(final DOMDataTreeChangeListener listener) {
+        return Props.create(new DataTreeChangeListenerCreator(listener));
+    }
+
+    private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
+        private static final long serialVersionUID = 1L;
+        private final DOMDataTreeChangeListener listener;
+
+        DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
+            this.listener = Preconditions.checkNotNull(listener);
+        }
+
+        @Override
+        public DataTreeChangeListenerActor create() {
+            return new DataTreeChangeListenerActor(listener);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
new file mode 100644 (file)
index 0000000..124724b
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Proxy class for holding required state to lazily instantiate a listener registration with an
+ * asynchronously-discovered actor.
+ *
+ * @param <T> listener type
+ */
+final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
+    private final ActorRef dataChangeListenerActor;
+    private final ActorContext actorContext;
+
+    @GuardedBy("this")
+    private ActorSelection listenerRegistrationActor;
+
+    public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+        super(listener);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+            DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+    }
+
+    @Override
+    protected synchronized void removeRegistration() {
+        if (listenerRegistrationActor != null) {
+            listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+            listenerRegistrationActor = null;
+        }
+
+        dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    void init(final String shardName, final YangInstanceIdentifier treeId) {
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef shard) {
+                if (failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
+                            "cannot be registered", shardName, getInstance(), treeId);
+                } else if (failure != null) {
+                    LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
+                            "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+                } else {
+                    doRegistration(shard, treeId);
+                }
+            }
+        }, actorContext.getClientDispatcher());
+    }
+
+    private void setListenerRegistrationActor(final ActorSelection actor) {
+        if (actor == null) {
+            LOG.debug("Ignoring null actor on {}", this);
+            return;
+        }
+
+        synchronized (this) {
+            if (!isClosed()) {
+                this.listenerRegistrationActor = actor;
+                return;
+            }
+        }
+
+        // This registration has already been closed, notify the actor
+        actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+    }
+
+    private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
+                actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+        future.onComplete(new OnComplete<Object>(){
+            @Override
+            public void onComplete(final Throwable failure, final Object result) {
+                if (failure != null) {
+                    LOG.error("Failed to register DataTreeChangeListener {} at path {}",
+                            getInstance(), path.toString(), failure);
+                } else {
+                    RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath().path()));
+                }
+            }
+        }, actorContext.getClientDispatcher());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java
new file mode 100644 (file)
index 0000000..7d0117f
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
+ */
+public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
+    private final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+    public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+        this.registration = Preconditions.checkNotNull(registration);
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof CloseDataTreeChangeListenerRegistration) {
+            registration.close();
+            getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        }
+    }
+
+    public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+        return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
+    }
+
+    private static final class DataTreeChangeListenerRegistrationCreator implements Creator<DataTreeChangeListenerRegistrationActor> {
+        private static final long serialVersionUID = 1L;
+        final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+        DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
+            this.registration = Preconditions.checkNotNull(registration);
+        }
+
+        @Override
+        public DataTreeChangeListenerRegistrationActor create() {
+            return new DataTreeChangeListenerRegistrationActor(registration);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
new file mode 100644 (file)
index 0000000..afce4df
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
+    private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
+    private final Collection<ActorSelection> actors = new ArrayList<>();
+    private final Shard shard;
+
+    DataTreeChangeListenerSupport(final Shard shard) {
+        this.shard = Preconditions.checkNotNull(shard);
+    }
+
+    @Override
+    void onLeadershipChange(final boolean isLeader) {
+        if (isLeader) {
+            for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
+                reg.createDelegate(this);
+            }
+            delayedRegistrations.clear();
+            delayedRegistrations.trimToSize();
+        }
+
+        final EnableNotification msg = new EnableNotification(isLeader);
+        for (ActorSelection dataChangeListener : actors) {
+            dataChangeListener.tell(msg, shard.getSelf());
+        }
+    }
+
+    @Override
+    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+        LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", shard.persistenceId(), registerTreeChangeListener.getPath(), isLeader);
+
+        final ListenerRegistration<DOMDataTreeChangeListener> registration;
+        if (!isLeader) {
+            LOG.debug("{}: Shard is not the leader - delaying registration", shard.persistenceId());
+
+            DelayedDataTreeListenerRegistration delayedReg =
+                    new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
+            delayedRegistrations.add(delayedReg);
+            registration = delayedReg;
+        } else {
+            registration = createDelegate(registerTreeChangeListener);
+        }
+
+        ActorRef listenerRegistration = shard.getContext().actorOf(
+                DataTreeChangeListenerRegistrationActor.props(registration));
+
+        LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+            shard.persistenceId(), listenerRegistration.path());
+
+        shard.getSender().tell(new RegisterDataTreeChangeListenerReply(listenerRegistration), shard.getSelf());
+    }
+
+    @Override
+    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+        ActorSelection dataChangeListenerPath = shard.getContext().system().actorSelection(
+            message.getDataTreeChangeListenerPath().path());
+
+        // Notify the listener if notifications should be enabled or not
+        // If this shard is the leader then it will enable notifications else
+        // it will not
+        dataChangeListenerPath.tell(new EnableNotification(true), shard.getSelf());
+
+        // Now store a reference to the data change listener so it can be notified
+        // at a later point if notifications should be enabled or disabled
+        actors.add(dataChangeListenerPath);
+
+        DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+
+        LOG.debug("{}: Registering for path {}", shard.persistenceId(), message.getPath());
+
+        return shard.getDataStore().registerTreeChangeListener(message.getPath(), listener);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
new file mode 100644 (file)
index 0000000..b3ae8a3
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Intermediate proxy registration returned to the user when we cannot
+ * instantiate the registration immediately. It provides a bridge to
+ * a real registration which may materialize at some point in the future.
+ */
+final class DelayedDataTreeListenerRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
+    private final RegisterDataTreeChangeListener registerTreeChangeListener;
+    private volatile ListenerRegistration<DOMDataTreeChangeListener> delegate;
+    @GuardedBy("this")
+    private boolean closed;
+
+    DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
+        this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
+    }
+
+    synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+        if (!closed) {
+            this.delegate = factory.createDelegate(registerTreeChangeListener);
+        }
+    }
+
+    @Override
+    public DOMDataTreeChangeListener getInstance() {
+        final ListenerRegistration<DOMDataTreeChangeListener> d = delegate;
+        return d == null ? null : d.getInstance();
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+            if (delegate != null) {
+                delegate.close();
+            }
+        }
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
new file mode 100644 (file)
index 0000000..e6702d9
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class DelegateFactory<M, D> {
+    abstract D createDelegate(M message);
+}
index c79de94..69c127f 100644 (file)
@@ -21,10 +21,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class DistributedDataStore implements DOMStore, SchemaContextListener,
-        DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
+        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     private static final String UNKNOWN_TYPE = "unknown";
@@ -125,6 +127,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         return listenerRegistrationProxy;
     }
 
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+        Preconditions.checkNotNull(treeId, "treeId should not be null");
+        Preconditions.checkNotNull(listener, "listener should not be null");
+
+        final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
+
+        final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
+                new DataTreeChangeListenerProxy<L>(actorContext, listener);
+        listenerRegistrationProxy.init(shardName, treeId);
+
+        return listenerRegistrationProxy;
+    }
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java
new file mode 100644 (file)
index 0000000..7ca21d4
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Internal implementation of a {@link DOMDataTreeChangeListener} which
+ * encapsulates received notifications into a {@link DataTreeChanged}
+ * message and forwards them towards the client's {@link DataTreeChangeListenerActor}.
+ */
+final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener {
+    private final ActorSelection actor;
+
+    ForwardingDataTreeChangeListener(final ActorSelection actor) {
+        this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
+    }
+
+    @Override
+    public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+        actor.tell(new DataTreeChanged(changes), ActorRef.noSender());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
new file mode 100644 (file)
index 0000000..891c0bf
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates which are local to the
+ * shard leader.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+    /**
+     * Invoked whenever the local shard's leadership role changes.
+     *
+     * @param isLeader true if the shard has become leader, false if it has
+     *                 become a follower.
+     */
+    abstract void onLeadershipChange(boolean isLeader);
+    abstract void onMessage(M message, boolean isLeader);
+}
index c04256a..6868cc1 100644 (file)
@@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
@@ -141,6 +142,8 @@ public class Shard extends RaftActor {
 
     private final String txnDispatcherPath;
 
+    private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
+
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
@@ -158,7 +161,7 @@ public class Shard extends RaftActor {
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
-        if(schemaContext != null) {
+        if (schemaContext != null) {
             store.onGlobalContextUpdated(schemaContext);
         }
 
@@ -272,6 +275,8 @@ public class Shard extends RaftActor {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
                 registerChangeListener((RegisterChangeListener) message);
+            } else if (message instanceof RegisterDataTreeChangeListener) {
+                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
             } else if (message instanceof PeerAddressResolved) {
@@ -826,6 +831,8 @@ public class Shard extends RaftActor {
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
+        treeChangeSupport.onLeadershipChange(isLeader);
+
         for (ActorSelection dataChangeListener : dataChangeListeners) {
             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..032f4c1
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration();
+
+    private CloseDataTreeChangeListenerRegistration() {
+    }
+
+    public static CloseDataTreeChangeListenerRegistration getInstance() {
+        return INSTANCE;
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java
new file mode 100644 (file)
index 0000000..9d83fac
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply();
+
+    private CloseDataTreeChangeListenerRegistrationReply() {
+        // Use getInstance() instead
+    }
+
+    public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+        return INSTANCE;
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java
new file mode 100644 (file)
index 0000000..919f944
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * A message about a DataTree having been changed. The message is not
+ * serializable on purpose. For delegating the change across cluster nodes,
+ * this needs to be intercepted by a local agent and forwarded as
+ * a {@link DataTreeDelta}.
+ */
+public final class DataTreeChanged {
+    private final Collection<DataTreeCandidate> changes;
+
+    public DataTreeChanged(final Collection<DataTreeCandidate> changes) {
+        this.changes = Preconditions.checkNotNull(changes);
+    }
+
+    /**
+     * Return the data changes.
+     *
+     * @return Change events
+     */
+    public Collection<DataTreeCandidate> getChanges() {
+        return changes;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java
new file mode 100644 (file)
index 0000000..e4a8d74
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class DataTreeChangedReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final DataTreeChangedReply INSTANCE = new DataTreeChangedReply();
+
+    private DataTreeChangedReply() {
+        // Use getInstance() instead
+    }
+
+    public static DataTreeChangedReply getInstance() {
+        return INSTANCE;
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java
new file mode 100644 (file)
index 0000000..941336e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
+ * leader.
+ */
+public final class RegisterDataTreeChangeListener implements Externalizable {
+    private static final long serialVersionUID = 1L;
+    private ActorRef dataTreeChangeListenerPath;
+    private YangInstanceIdentifier path;
+
+    public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) {
+        this.path = Preconditions.checkNotNull(path);
+        this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath);
+    }
+
+    public YangInstanceIdentifier getPath() {
+        return path;
+    }
+
+    public ActorRef getDataTreeChangeListenerPath() {
+        return dataTreeChangeListenerPath;
+    }
+
+    @Override
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        out.writeObject(dataTreeChangeListenerPath);
+        SerializationUtils.serializePath(path, out);
+    }
+
+    @Override
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        dataTreeChangeListenerPath = (ActorRef) in.readObject();
+        path = SerializationUtils.deserializePath(in);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java
new file mode 100644 (file)
index 0000000..88682ae
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Successful reply to a {@link RegisterDataTreeChangeListener} request.
+ */
+public final class RegisterDataTreeChangeListenerReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final ActorRef listenerRegistrationPath;
+
+    public RegisterDataTreeChangeListenerReply(final ActorRef listenerRegistrationPath) {
+        this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath);
+    }
+
+    public ActorRef getListenerRegistrationPath() {
+        return listenerRegistrationPath;
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.