BUG-2673: make CDS implement DOMDataTreeChangeListener 90/16690/25
authorRobert Varga <rovarga@cisco.com>
Tue, 17 Mar 2015 10:18:53 +0000 (11:18 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 31 Mar 2015 08:54:17 +0000 (10:54 +0200)
This patch adds the base support for registering
DOMDataTreeChangeListeners. These are delivered only on the local node,
as efficient serialization requires interaction with the cluster
topology and shard leadership handoff. That functionality will be
delivered in a follow-up patch.

It also introduces a bit common infrastructure to be used by
DataChangeListener code, as it performs a very similar function.

Change-Id: Ifb91d08857684fb160fd923bc25c294d2fca4bc3
Signed-off-by: Robert Varga <rovarga@cisco.com>
17 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java [new file with mode: 0644]

index d4aab036be21df1734f4f88bc590b35030a71d21..055ccfe0ceeeed6ee101b60f4dcda6d0a2a83a9d 100644 (file)
@@ -66,7 +66,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
         output = new DataOutputStream(stream);
     }
 
         output = new DataOutputStream(stream);
     }
 
-    public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException {
+    public NormalizedNodeOutputStreamWriter(DataOutput output) {
         this.output = Preconditions.checkNotNull(output);
     }
 
         this.output = Preconditions.checkNotNull(output);
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerActor.java
new file mode 100644 (file)
index 0000000..3f11909
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
+ * DataTreeChanged messages and dispatching their context to the user.
+ */
+final class DataTreeChangeListenerActor extends AbstractUntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
+    private final DOMDataTreeChangeListener listener;
+    private boolean notificationsEnabled = false;
+
+    private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+        this.listener = Preconditions.checkNotNull(listener);
+    }
+
+    @Override
+    protected void handleReceive(final Object message) {
+        if (message instanceof DataTreeChanged) {
+            dataChanged((DataTreeChanged)message);
+        } else if (message instanceof EnableNotification) {
+            enableNotification((EnableNotification) message);
+        }
+    }
+
+    private void dataChanged(final DataTreeChanged message) {
+        // Do nothing if notifications are not enabled
+        if (!notificationsEnabled) {
+            LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
+            return;
+        }
+
+        LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener);
+
+        try {
+            this.listener.onDataTreeChanged(message.getChanges());
+        } catch (Exception e) {
+            LOG.error("Error notifying listener {}", this.listener, e);
+        }
+
+        // TODO: do we really need this?
+        // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
+        // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
+        if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+            getSender().tell(DataTreeChangedReply.getInstance(), getSelf());
+        }
+    }
+
+    private void enableNotification(final EnableNotification message) {
+        notificationsEnabled = message.isEnabled();
+        LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+                listener);
+    }
+
+    public static Props props(final DOMDataTreeChangeListener listener) {
+        return Props.create(new DataTreeChangeListenerCreator(listener));
+    }
+
+    private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
+        private static final long serialVersionUID = 1L;
+        private final DOMDataTreeChangeListener listener;
+
+        DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
+            this.listener = Preconditions.checkNotNull(listener);
+        }
+
+        @Override
+        public DataTreeChangeListenerActor create() {
+            return new DataTreeChangeListenerActor(listener);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerProxy.java
new file mode 100644 (file)
index 0000000..124724b
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Proxy class for holding required state to lazily instantiate a listener registration with an
+ * asynchronously-discovered actor.
+ *
+ * @param <T> listener type
+ */
+final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
+    private final ActorRef dataChangeListenerActor;
+    private final ActorContext actorContext;
+
+    @GuardedBy("this")
+    private ActorSelection listenerRegistrationActor;
+
+    public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+        super(listener);
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+            DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+    }
+
+    @Override
+    protected synchronized void removeRegistration() {
+        if (listenerRegistrationActor != null) {
+            listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+            listenerRegistrationActor = null;
+        }
+
+        dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    void init(final String shardName, final YangInstanceIdentifier treeId) {
+        Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+        findFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorRef shard) {
+                if (failure instanceof LocalShardNotFoundException) {
+                    LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
+                            "cannot be registered", shardName, getInstance(), treeId);
+                } else if (failure != null) {
+                    LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
+                            "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+                } else {
+                    doRegistration(shard, treeId);
+                }
+            }
+        }, actorContext.getClientDispatcher());
+    }
+
+    private void setListenerRegistrationActor(final ActorSelection actor) {
+        if (actor == null) {
+            LOG.debug("Ignoring null actor on {}", this);
+            return;
+        }
+
+        synchronized (this) {
+            if (!isClosed()) {
+                this.listenerRegistrationActor = actor;
+                return;
+            }
+        }
+
+        // This registration has already been closed, notify the actor
+        actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+    }
+
+    private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+
+        Future<Object> future = actorContext.executeOperationAsync(shard,
+                new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
+                actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+        future.onComplete(new OnComplete<Object>(){
+            @Override
+            public void onComplete(final Throwable failure, final Object result) {
+                if (failure != null) {
+                    LOG.error("Failed to register DataTreeChangeListener {} at path {}",
+                            getInstance(), path.toString(), failure);
+                } else {
+                    RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
+                    setListenerRegistrationActor(actorContext.actorSelection(
+                            reply.getListenerRegistrationPath().path()));
+                }
+            }
+        }, actorContext.getClientDispatcher());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerRegistrationActor.java
new file mode 100644 (file)
index 0000000..7d0117f
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
+ */
+public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
+    private final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+    public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+        this.registration = Preconditions.checkNotNull(registration);
+    }
+
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (message instanceof CloseDataTreeChangeListenerRegistration) {
+            registration.close();
+            getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
+            getSelf().tell(PoisonPill.getInstance(), getSelf());
+        }
+    }
+
+    public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+        return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
+    }
+
+    private static final class DataTreeChangeListenerRegistrationCreator implements Creator<DataTreeChangeListenerRegistrationActor> {
+        private static final long serialVersionUID = 1L;
+        final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+        DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
+            this.registration = Preconditions.checkNotNull(registration);
+        }
+
+        @Override
+        public DataTreeChangeListenerRegistrationActor create() {
+            return new DataTreeChangeListenerRegistrationActor(registration);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
new file mode 100644 (file)
index 0000000..afce4df
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
+    private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
+    private final Collection<ActorSelection> actors = new ArrayList<>();
+    private final Shard shard;
+
+    DataTreeChangeListenerSupport(final Shard shard) {
+        this.shard = Preconditions.checkNotNull(shard);
+    }
+
+    @Override
+    void onLeadershipChange(final boolean isLeader) {
+        if (isLeader) {
+            for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
+                reg.createDelegate(this);
+            }
+            delayedRegistrations.clear();
+            delayedRegistrations.trimToSize();
+        }
+
+        final EnableNotification msg = new EnableNotification(isLeader);
+        for (ActorSelection dataChangeListener : actors) {
+            dataChangeListener.tell(msg, shard.getSelf());
+        }
+    }
+
+    @Override
+    void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+        LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", shard.persistenceId(), registerTreeChangeListener.getPath(), isLeader);
+
+        final ListenerRegistration<DOMDataTreeChangeListener> registration;
+        if (!isLeader) {
+            LOG.debug("{}: Shard is not the leader - delaying registration", shard.persistenceId());
+
+            DelayedDataTreeListenerRegistration delayedReg =
+                    new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
+            delayedRegistrations.add(delayedReg);
+            registration = delayedReg;
+        } else {
+            registration = createDelegate(registerTreeChangeListener);
+        }
+
+        ActorRef listenerRegistration = shard.getContext().actorOf(
+                DataTreeChangeListenerRegistrationActor.props(registration));
+
+        LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+            shard.persistenceId(), listenerRegistration.path());
+
+        shard.getSender().tell(new RegisterDataTreeChangeListenerReply(listenerRegistration), shard.getSelf());
+    }
+
+    @Override
+    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+        ActorSelection dataChangeListenerPath = shard.getContext().system().actorSelection(
+            message.getDataTreeChangeListenerPath().path());
+
+        // Notify the listener if notifications should be enabled or not
+        // If this shard is the leader then it will enable notifications else
+        // it will not
+        dataChangeListenerPath.tell(new EnableNotification(true), shard.getSelf());
+
+        // Now store a reference to the data change listener so it can be notified
+        // at a later point if notifications should be enabled or disabled
+        actors.add(dataChangeListenerPath);
+
+        DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+
+        LOG.debug("{}: Registering for path {}", shard.persistenceId(), message.getPath());
+
+        return shard.getDataStore().registerTreeChangeListener(message.getPath(), listener);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
new file mode 100644 (file)
index 0000000..b3ae8a3
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Intermediate proxy registration returned to the user when we cannot
+ * instantiate the registration immediately. It provides a bridge to
+ * a real registration which may materialize at some point in the future.
+ */
+final class DelayedDataTreeListenerRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
+    private final RegisterDataTreeChangeListener registerTreeChangeListener;
+    private volatile ListenerRegistration<DOMDataTreeChangeListener> delegate;
+    @GuardedBy("this")
+    private boolean closed;
+
+    DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
+        this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
+    }
+
+    synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+        if (!closed) {
+            this.delegate = factory.createDelegate(registerTreeChangeListener);
+        }
+    }
+
+    @Override
+    public DOMDataTreeChangeListener getInstance() {
+        final ListenerRegistration<DOMDataTreeChangeListener> d = delegate;
+        return d == null ? null : d.getInstance();
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+            if (delegate != null) {
+                delegate.close();
+            }
+        }
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
new file mode 100644 (file)
index 0000000..e6702d9
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class DelegateFactory<M, D> {
+    abstract D createDelegate(M message);
+}
index c79de945675a0f14d8a40fd6dc13f4007a3a9669..69c127f2897017f218222b2a95b270b1bcc9f0de 100644 (file)
@@ -21,10 +21,12 @@ import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class DistributedDataStore implements DOMStore, SchemaContextListener,
  *
  */
 public class DistributedDataStore implements DOMStore, SchemaContextListener,
-        DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
+        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     private static final String UNKNOWN_TYPE = "unknown";
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
     private static final String UNKNOWN_TYPE = "unknown";
@@ -125,6 +127,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         return listenerRegistrationProxy;
     }
 
         return listenerRegistrationProxy;
     }
 
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+        Preconditions.checkNotNull(treeId, "treeId should not be null");
+        Preconditions.checkNotNull(listener, "listener should not be null");
+
+        final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
+
+        final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
+                new DataTreeChangeListenerProxy<L>(actorContext, listener);
+        listenerRegistrationProxy.init(shardName, treeId);
+
+        return listenerRegistrationProxy;
+    }
+
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext);
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return new TransactionChainProxy(actorContext);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ForwardingDataTreeChangeListener.java
new file mode 100644 (file)
index 0000000..7ca21d4
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Internal implementation of a {@link DOMDataTreeChangeListener} which
+ * encapsulates received notifications into a {@link DataTreeChanged}
+ * message and forwards them towards the client's {@link DataTreeChangeListenerActor}.
+ */
+final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener {
+    private final ActorSelection actor;
+
+    ForwardingDataTreeChangeListener(final ActorSelection actor) {
+        this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
+    }
+
+    @Override
+    public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+        actor.tell(new DataTreeChanged(changes), ActorRef.noSender());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
new file mode 100644 (file)
index 0000000..891c0bf
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates which are local to the
+ * shard leader.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+    /**
+     * Invoked whenever the local shard's leadership role changes.
+     *
+     * @param isLeader true if the shard has become leader, false if it has
+     *                 become a follower.
+     */
+    abstract void onLeadershipChange(boolean isLeader);
+    abstract void onMessage(M message, boolean isLeader);
+}
index c04256a28efb5b01dd94429f81cb72b71ae3e1eb..6868cc15cd8b21dd899d3b066142fc7fbeefce23 100644 (file)
@@ -56,6 +56,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
@@ -141,6 +142,8 @@ public class Shard extends RaftActor {
 
     private final String txnDispatcherPath;
 
 
     private final String txnDispatcherPath;
 
+    private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
+
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
     protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
@@ -158,7 +161,7 @@ public class Shard extends RaftActor {
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
                 datastoreContext.getDataStoreProperties());
 
-        if(schemaContext != null) {
+        if (schemaContext != null) {
             store.onGlobalContextUpdated(schemaContext);
         }
 
             store.onGlobalContextUpdated(schemaContext);
         }
 
@@ -272,6 +275,8 @@ public class Shard extends RaftActor {
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
                 registerChangeListener((RegisterChangeListener) message);
                 closeTransactionChain(CloseTransactionChain.fromSerializable(message));
             } else if (message instanceof RegisterChangeListener) {
                 registerChangeListener((RegisterChangeListener) message);
+            } else if (message instanceof RegisterDataTreeChangeListener) {
+                treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
             } else if (message instanceof PeerAddressResolved) {
             } else if (message instanceof UpdateSchemaContext) {
                 updateSchemaContext((UpdateSchemaContext) message);
             } else if (message instanceof PeerAddressResolved) {
@@ -826,6 +831,8 @@ public class Shard extends RaftActor {
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
+        treeChangeSupport.onLeadershipChange(isLeader);
+
         for (ActorSelection dataChangeListener : dataChangeListeners) {
             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
         }
         for (ActorSelection dataChangeListener : dataChangeListeners) {
             dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistration.java
new file mode 100644 (file)
index 0000000..032f4c1
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration();
+
+    private CloseDataTreeChangeListenerRegistration() {
+    }
+
+    public static CloseDataTreeChangeListenerRegistration getInstance() {
+        return INSTANCE;
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CloseDataTreeChangeListenerRegistrationReply.java
new file mode 100644 (file)
index 0000000..9d83fac
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply();
+
+    private CloseDataTreeChangeListenerRegistrationReply() {
+        // Use getInstance() instead
+    }
+
+    public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+        return INSTANCE;
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChanged.java
new file mode 100644 (file)
index 0000000..919f944
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * A message about a DataTree having been changed. The message is not
+ * serializable on purpose. For delegating the change across cluster nodes,
+ * this needs to be intercepted by a local agent and forwarded as
+ * a {@link DataTreeDelta}.
+ */
+public final class DataTreeChanged {
+    private final Collection<DataTreeCandidate> changes;
+
+    public DataTreeChanged(final Collection<DataTreeCandidate> changes) {
+        this.changes = Preconditions.checkNotNull(changes);
+    }
+
+    /**
+     * Return the data changes.
+     *
+     * @return Change events
+     */
+    public Collection<DataTreeCandidate> getChanges() {
+        return changes;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataTreeChangedReply.java
new file mode 100644 (file)
index 0000000..e4a8d74
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class DataTreeChangedReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private static final DataTreeChangedReply INSTANCE = new DataTreeChangedReply();
+
+    private DataTreeChangedReply() {
+        // Use getInstance() instead
+    }
+
+    public static DataTreeChangedReply getInstance() {
+        return INSTANCE;
+    }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListener.java
new file mode 100644 (file)
index 0000000..941336e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
+ * leader.
+ */
+public final class RegisterDataTreeChangeListener implements Externalizable {
+    private static final long serialVersionUID = 1L;
+    private ActorRef dataTreeChangeListenerPath;
+    private YangInstanceIdentifier path;
+
+    public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) {
+        this.path = Preconditions.checkNotNull(path);
+        this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath);
+    }
+
+    public YangInstanceIdentifier getPath() {
+        return path;
+    }
+
+    public ActorRef getDataTreeChangeListenerPath() {
+        return dataTreeChangeListenerPath;
+    }
+
+    @Override
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        out.writeObject(dataTreeChangeListenerPath);
+        SerializationUtils.serializePath(path, out);
+    }
+
+    @Override
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        dataTreeChangeListenerPath = (ActorRef) in.readObject();
+        path = SerializationUtils.deserializePath(in);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterDataTreeChangeListenerReply.java
new file mode 100644 (file)
index 0000000..88682ae
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Successful reply to a {@link RegisterDataTreeChangeListener} request.
+ */
+public final class RegisterDataTreeChangeListenerReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final ActorRef listenerRegistrationPath;
+
+    public RegisterDataTreeChangeListenerReply(final ActorRef listenerRegistrationPath) {
+        this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath);
+    }
+
+    public ActorRef getListenerRegistrationPath() {
+        return listenerRegistrationPath;
+    }
+}