RemoteDeviceDataBroker proxy 18/30718/4
authorTomas Cere <tcere@cisco.com>
Mon, 30 Nov 2015 16:15:42 +0000 (17:15 +0100)
committerTomas Cere <tcere@cisco.com>
Tue, 15 Dec 2015 09:33:30 +0000 (10:33 +0100)
Change-Id: Ifdb7b905f95dc2af5e31003e59405ebc446d85d3
Signed-off-by: Tomas Cere <tcere@cisco.com>
25 files changed:
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManager.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/NodeManagerCallback.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/RoleChangeStrategy.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/example/ExampleNodeManagerCallback.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/BaseNodeManager.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NodeRoleChangeStrategy.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/NoopRoleChangeStrategy.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/TopologyRoleChangeStrategy.java
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPoint.java [new file with mode: 0644]
opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPointDown.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/NetconfTopology.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/ClusteredNetconfTopology.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfNodeManagerCallback.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyImpl.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/NetconfTopologyManagerCallback.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/impl/TopologyNodeWriter.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceMasterDataBroker.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceSlaveDataBroker.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ProxyNetconfDeviceDataBroker.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/TopologyMountPointFacade.java
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/NetconfDeviceDataBrokerProxy.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyReadOnlyTransaction.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyWriteOnlyTransaction.java [new file with mode: 0644]
opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/ActorTest.java
opendaylight/netconf/netconf-topology/src/test/java/org/opendaylight/netconf/topology/TestingTopologyDispatcher.java

index b4b26a859a48adf21dfdafe006e158f00749e406..c8ca14d252a30038e470f82f03502cb5b618a8fe 100644 (file)
@@ -10,11 +10,13 @@ package org.opendaylight.netconf.topology;
 
 import akka.actor.TypedActor.Receiver;
 import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 
 /**
  * Node manager that handles communication between node managers and delegates calls to the customizable NodeManagerCallback
  */
 @Beta
-public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener {
+public interface NodeManager extends InitialStateProvider, NodeListener, Receiver, RemoteNodeListener, RemoteDeviceHandler<NetconfSessionPreferences> {
 
 }
index fd0444aa5c207639c2fb6f6390eba69f83155858..670bb81ebe1d2ac4fbb487367af04938e2eddfb4 100644 (file)
@@ -11,12 +11,14 @@ package org.opendaylight.netconf.topology;
 import akka.actor.ActorSystem;
 import akka.actor.TypedActor.Receiver;
 import com.google.common.annotations.Beta;
+import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 
 /**
  * Customizable layer that handles communication with your application.
  */
 @Beta
-public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver {
+public interface NodeManagerCallback extends InitialStateProvider, NodeListener, Receiver, RemoteDeviceHandler<NetconfSessionPreferences> {
 
     interface NodeManagerCallbackFactory<M> {
         NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem);
index 39cccf653c6ee54a280a0aa4aebe49c4515d7676..f4f3013b7f9be842b14e03521c8ba30e9ad33879 100644 (file)
@@ -30,4 +30,10 @@ public interface RoleChangeStrategy extends RoleChangeListener {
      */
     void unregisterRoleCandidate();
 
+    /**
+     *
+     * @return True/False based on if this candidate is already registered into ownership service
+     */
+    boolean isCandidateRegistered();
+
 }
index 05b69fc79a40d6049072f08c2686057eb58e7922..823c44b47aa552bf8420d1eff71f13e106a00e60 100644 (file)
@@ -12,6 +12,9 @@ import akka.actor.ActorRef;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.topology.NodeManagerCallback;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
@@ -19,6 +22,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev15
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class ExampleNodeManagerCallback implements NodeManagerCallback {
 
@@ -70,4 +74,29 @@ public class ExampleNodeManagerCallback implements NodeManagerCallback {
     public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
 
     }
+
+    @Override
+    public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+
+    }
+
+    @Override
+    public void onDeviceDisconnected() {
+
+    }
+
+    @Override
+    public void onDeviceFailed(Throwable throwable) {
+
+    }
+
+    @Override
+    public void onNotification(DOMNotification domNotification) {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
 }
index 03151505f72c3aa6aa1ab01af487dd618e9f9c24..2f8b1b7f6220cc28cfbff9064f39caa30b281f44 100644 (file)
@@ -17,6 +17,9 @@ import akka.japi.Creator;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.topology.NodeManager;
 import org.opendaylight.netconf.topology.NodeManagerCallback;
 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
@@ -24,6 +27,7 @@ import org.opendaylight.netconf.topology.RoleChangeStrategy;
 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -36,7 +40,7 @@ public final class BaseNodeManager implements NodeManager {
     private final String topologyId;
     private final ActorSystem actorSystem;
 
-    private boolean isMaster;
+    private boolean isMaster = false;
     private NodeManagerCallback delegate;
 
     private BaseNodeManager(final String nodeId,
@@ -99,7 +103,7 @@ public final class BaseNodeManager implements NodeManager {
 
     @Override
     public void onReceive(Object o, ActorRef actorRef) {
-
+        delegate.onReceive(o, actorRef);
     }
 
     @Override
@@ -122,6 +126,31 @@ public final class BaseNodeManager implements NodeManager {
         return null;
     }
 
+    @Override
+    public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+        delegate.onDeviceConnected(remoteSchemaContext, netconfSessionPreferences, deviceRpc);
+    }
+
+    @Override
+    public void onDeviceDisconnected() {
+        delegate.onDeviceDisconnected();
+    }
+
+    @Override
+    public void onDeviceFailed(Throwable throwable) {
+        delegate.onDeviceFailed(throwable);
+    }
+
+    @Override
+    public void onNotification(DOMNotification domNotification) {
+        delegate.onNotification(domNotification);
+    }
+
+    @Override
+    public void close() {
+        // NOOP
+    }
+
     /**
      * Builder of BaseNodeManager instances that are proxied as TypedActors
      */
index 55bdb2e2dd071e548c387337b28dd2388e7c74f4..3cafd35b55f73391d35ac712c65cbc1eba9c85fa 100644 (file)
@@ -27,6 +27,7 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh
     private final EntityOwnershipService entityOwnershipService;
     private final String entityType;
     private final String entityName;
+    private final Entity entity;
     private NodeListener ownershipCandidate;
 
     private EntityOwnershipCandidateRegistration candidateRegistration = null;
@@ -38,6 +39,7 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh
         this.entityOwnershipService = entityOwnershipService;
         this.entityType = entityType + "/" + entityName;
         this.entityName = entityName;
+        this.entity = new Entity(this.entityType, entityName);
     }
 
     @Override
@@ -48,7 +50,7 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh
             if (candidateRegistration != null) {
                 unregisterRoleCandidate();
             }
-            candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+            candidateRegistration = entityOwnershipService.registerCandidate(entity);
             ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
         } catch (CandidateAlreadyRegisteredException e) {
             LOG.error("Candidate already registered for election", e);
@@ -65,6 +67,11 @@ public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnersh
         ownershipListenerRegistration = null;
     }
 
+    @Override
+    public boolean isCandidateRegistered() {
+        return entityOwnershipService.isCandidateRegistered(entity);
+    }
+
     @Override
     public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
         LOG.debug("Role was changed {}", roleChangeDTO);
index ab76cc2d64b01a71a9b9d08fdb984cd026470cb1..ea6e5d5eeb2a0850fde981857d0a711cefa04773 100644 (file)
@@ -27,6 +27,11 @@ public class NoopRoleChangeStrategy implements RoleChangeStrategy {
 
     }
 
+    @Override
+    public boolean isCandidateRegistered() {
+        return false;
+    }
+
     @Override
     public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
 
index de9f7aca39aa45180603f40bfb32cee4ae3a21c9..94cd8799af1122c56a26a038f65751f856fb28f7 100644 (file)
@@ -49,7 +49,7 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered
     private NodeListener ownershipCandidate;
     private final String entityType;
     // use topologyId as entityName
-    private final String entityName;
+    private final Entity entity;
 
     private EntityOwnershipCandidateRegistration candidateRegistration = null;
     private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
@@ -63,7 +63,7 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered
         this.dataBroker = dataBroker;
         this.entityOwnershipService = entityOwnershipService;
         this.entityType = entityType;
-        this.entityName = entityName;
+        this.entity = new Entity(entityType, entityName);
 
         datastoreListenerRegistration = null;
     }
@@ -76,7 +76,7 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered
             if (candidateRegistration != null) {
                 unregisterRoleCandidate();
             }
-            candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+            candidateRegistration = entityOwnershipService.registerCandidate(entity);
             ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
         } catch (CandidateAlreadyRegisteredException e) {
             LOG.error("Candidate already registered for election", e);
@@ -92,6 +92,11 @@ public class TopologyRoleChangeStrategy implements RoleChangeStrategy, Clustered
         ownershipListenerRegistration = null;
     }
 
+    @Override
+    public boolean isCandidateRegistered() {
+        return entityOwnershipService.isCandidateRegistered(entity);
+    }
+
     @Override
     public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
         if (roleChangeDTO.isOwner()) {
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPoint.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPoint.java
new file mode 100644 (file)
index 0000000..6624a28
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterMountPoint implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public AnnounceMasterMountPoint() {}
+}
diff --git a/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPointDown.java b/opendaylight/netconf/abstract-topology/src/main/java/org/opendaylight/netconf/topology/util/messages/AnnounceMasterMountPointDown.java
new file mode 100644 (file)
index 0000000..a7c3f87
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterMountPointDown implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public AnnounceMasterMountPointDown() {
+
+    }
+}
index c9b5409d20e24c2f970bdea7c8784918be4bf794..a8743ec56dbcbe37123e57732197cc48f531fbaf 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.netconf.topology;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
@@ -28,7 +30,20 @@ public interface NetconfTopology {
 
     ListenableFuture<Void> disconnectNode(NodeId nodeId);
 
-    void registerMountPoint(NodeId nodeId);
+    /**
+     * register master mount point
+     * @param context
+     * @param nodeId
+     */
+    void registerMountPoint(ActorContext context, NodeId nodeId);
+
+    /**
+     * register slave mountpoint with the provided ActorRef
+     * @param context
+     * @param nodeId
+     * @param masterRef
+     */
+    void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef);
 
     void unregisterMountPoint(NodeId nodeId);
 
index 25b4e2d2f7f1ba9c1923d617654c59066fac16ae..bf03fe1eaa7c1215697a2f86aaa099e4ca9b38e5 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.netconf.topology.impl;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.TypedActor;
 import akka.actor.TypedActorExtension;
@@ -123,16 +125,21 @@ public class ClusteredNetconfTopology extends AbstractNetconfTopology implements
 
     @Override
     protected RemoteDeviceHandler<NetconfSessionPreferences> createSalFacade(final RemoteDeviceId id, final Broker domBroker, final BindingAwareBroker bindingBroker, long defaultRequestTimeoutMillis) {
-        return new TopologyMountPointFacade(id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
+        return new TopologyMountPointFacade(topologyId, id, domBroker, bindingBroker, defaultRequestTimeoutMillis);
     }
 
     @Override
-    public void registerMountPoint(NodeId nodeId) {
-        ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint();
+    public void registerMountPoint(final ActorContext context, final NodeId nodeId) {
+        ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context);
     }
 
     @Override
-    public void unregisterMountPoint(NodeId nodeId) {
+    public void registerMountPoint(final ActorContext context, final NodeId nodeId, final ActorRef masterRef) {
+        ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).registerMountPoint(actorSystem, context, masterRef);
+    }
+
+    @Override
+    public void unregisterMountPoint(final NodeId nodeId) {
         Preconditions.checkState(activeConnectors.containsKey(nodeId), "Cannot unregister nonexistent mountpoint");
         ((TopologyMountPointFacade) activeConnectors.get(nodeId).getFacade()).unregisterMountPoint();
     }
index a17650a6fb0403a30eeb33f445dd671682c548e2..5f0eb2e179ffe910b1947375984d2e400833aae4 100644 (file)
@@ -8,11 +8,13 @@
 
 package org.opendaylight.netconf.topology.impl;
 
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.TypedActor;
 import akka.actor.TypedProps;
 import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
@@ -29,15 +31,18 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManager;
 import org.opendaylight.netconf.topology.NodeManagerCallback;
 import org.opendaylight.netconf.topology.RoleChangeStrategy;
 import org.opendaylight.netconf.topology.TopologyManager;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseNodeManager;
 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
@@ -60,7 +65,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
+public class NetconfNodeManagerCallback implements NodeManagerCallback{
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
 
@@ -92,12 +97,18 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
     private String nodeId;
     private String topologyId;
     private TopologyManager topologyManager;
+    private NodeManager nodeManager;
+    // cached context so that we can use it in callbacks from topology
+    private ActorContext cachedContext;
 
     private Node currentConfig;
     private Node currentOperationalNode;
 
     private ConnectionStatusListenerRegistration registration = null;
 
+    private ActorRef masterDataBrokerRef = null;
+    private boolean connected = false;
+
     public NetconfNodeManagerCallback(final String nodeId,
                                       final String topologyId,
                                       final ActorSystem actorSystem,
@@ -123,6 +134,18 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
             }
         }, actorSystem.dispatcher());
+
+        final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
+        nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+                if (throwable != null) {
+                    LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
+                }
+                LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
+                nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
+            }
+        }, actorSystem.dispatcher());
     }
 
 
@@ -183,6 +206,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
 
     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
                                                                    @Nonnull final Node configNode) {
+        cachedContext = TypedActor.context();
         this.nodeId = nodeId.getValue();
         this.currentConfig = configNode;
         // set initial state before anything happens
@@ -194,7 +218,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
-                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
             }
 
             @Override
@@ -284,7 +308,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
                                         .build())
-                                .build();
+                        .build();
             }
         });
     }
@@ -306,16 +330,20 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
 
     @Override
     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
-        if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
-            return;
-        }
+        topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+
         isMaster = roleChangeDTO.isOwner();
-        //TODO instead of registering mount point, init remote schema repo when its done
         if (isMaster) {
-            // unregister old mountPoint if ownership changed, register a new one
-            topologyDispatcher.registerMountPoint(new NodeId(nodeId));
+            LOG.warn("Gained ownership of node - registering master mount point");
+            topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
         } else {
-            topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+            // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
+            // after we receive the message we can go ahead and register the mount point
+            if (connected && masterDataBrokerRef != null) {
+                topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+            } else {
+                LOG.debug("Mount point is ready, still waiting for master mount point");
+            }
         }
     }
 
@@ -323,7 +351,13 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
         LOG.debug("onDeviceConnected received, registering role candidate");
-        roleChangeStrategy.registerRoleCandidate(this);
+        connected = true;
+        roleChangeStrategy.registerRoleCandidate(nodeManager);
+        if (!isMaster && masterDataBrokerRef != null) {
+            // if we're not master but one is present already, we need to register mountpoint
+            LOG.warn("Device connected, master already present in topology, registering mount point");
+            topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
+        }
         List<String> capabilityList = new ArrayList<>();
         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
@@ -354,17 +388,26 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                                 .setUnavailableCapabilities(unavailableCapabilities)
                                 .build())
                 .build();
-        // TODO need to implement forwarding of this msg to master
         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
     }
 
     @Override
     public void onDeviceDisconnected() {
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
-        // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
         LOG.debug("onDeviceDisconnected received, unregistering role candidate");
-        topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+        connected = false;
+        if (isMaster) {
+            // announce that master mount point is going down
+            for (final Member member : clusterExtension.state().getMembers()) {
+                actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
+            }
+            // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
+            isMaster = false;
+            // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
+            topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+        }
         roleChangeStrategy.unregisterRoleCandidate();
+
         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
                 .addAugmentation(NetconfNode.class,
@@ -382,7 +425,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                                 .setHost(netconfNode.getHost())
                                 .setPort(netconfNode.getPort())
                                 .build()).build();
-        // TODO need to implement forwarding of this msg to master
         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
     }
 
@@ -391,6 +433,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
         LOG.debug("onDeviceFailed received");
+        connected = false;
         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
 
         roleChangeStrategy.unregisterRoleCandidate();
@@ -412,7 +455,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
     }
 
-
     @Override
     public void onNotification(DOMNotification domNotification) {
         //NOOP
@@ -424,7 +466,20 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
     }
 
     @Override
-    public void onReceive(Object o, ActorRef actorRef) {
-
+    public void onReceive(Object message, ActorRef actorRef) {
+        LOG.warn("Netconf node callback received message {}", message);
+        if (message instanceof AnnounceMasterMountPoint) {
+            masterDataBrokerRef = actorRef;
+            // candidate gets registered when mount point is already prepared so we can go ahead a register it
+            if (roleChangeStrategy.isCandidateRegistered()) {
+                topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+            } else {
+                LOG.warn("Announce master mount point msg received but mount point is not ready yet");
+            }
+        } else if (message instanceof AnnounceMasterMountPointDown) {
+            LOG.warn("Master mountpoint went down");
+            masterDataBrokerRef = null;
+            topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+        }
     }
 }
\ No newline at end of file
index 4329063b8a3486eff734abf59f222f6443d8a6a6..73d238d1d3cf1edde28367fa51e381239d9f5813 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.netconf.topology.impl;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
 import io.netty.util.concurrent.EventExecutor;
 import java.util.Collection;
 import javax.annotation.Nonnull;
@@ -72,7 +74,12 @@ public class NetconfTopologyImpl extends AbstractNetconfTopology implements Data
     }
 
     @Override
-    public void registerMountPoint(NodeId nodeId) {
+    public void registerMountPoint(ActorContext context, NodeId nodeId) {
+        throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
+    }
+
+    @Override
+    public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
         throw new UnsupportedOperationException("MountPoint registration is not supported in regular topology, this happens automaticaly in the netconf pipeline");
     }
 
index fecc93f72bce88a046017229ded0a8b33e2c65d0..eaa5d6cab744d6afc1a8d0b4f12d414bb718d2e3 100644 (file)
@@ -83,8 +83,10 @@ public class NetconfTopologyManagerCallback implements TopologyManagerCallback {
                 createNodeManager(nodeId);
         nodes.put(nodeId, naBaseNodeManager);
 
-        // put initial state into datastore
-        naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+        // only master should put initial state into datastore
+        if (isMaster) {
+            naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+        }
 
         // trigger connect on this node
         return naBaseNodeManager.onNodeCreated(nodeId, node);
@@ -92,8 +94,10 @@ public class NetconfTopologyManagerCallback implements TopologyManagerCallback {
 
     @Override
     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
-        // put initial state into datastore
-        naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+        // only master should put initial state into datastore
+        if (isMaster) {
+            naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+        }
 
         // Trigger onNodeUpdated only on this node
         return nodes.get(nodeId).onNodeUpdated(nodeId, node);
index d652b114d01360b0199345a31a2ab4e7007d764d..563ce977b290cd34fa757c557019c15f34396998 100644 (file)
@@ -134,7 +134,6 @@ public class TopologyNodeWriter implements NodeWriter{
                 throw new IllegalStateException(id.getValue() + "  Transaction(" + txType + ") not committed correctly", t);
             }
         });
-
     }
 
     private void createNetworkTopologyIfNotPresent(final WriteTransaction writeTx) {
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceMasterDataBroker.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceMasterDataBroker.java
new file mode 100644 (file)
index 0000000..f210820
--- /dev/null
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
+import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+public class NetconfDeviceMasterDataBroker implements ProxyNetconfDeviceDataBroker {
+
+    private final RemoteDeviceId id;
+
+    private final NetconfDeviceDataBroker delegateBroker;
+    private final ActorSystem actorSystem;
+
+    private DOMDataReadOnlyTransaction readTx;
+    private DOMDataWriteTransaction writeTx;
+
+    public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
+                                         final SchemaContext schemaContext, final DOMRpcService rpc,
+                                         final NetconfSessionPreferences netconfSessionPreferences, final long requestTimeoutMillis) {
+        this.id = id;
+        delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences, requestTimeoutMillis);
+        this.actorSystem = actorSystem;
+
+        // only ever need 1 readTx since it doesnt need to be closed
+        readTx = delegateBroker.newReadOnlyTransaction();
+    }
+
+    @Override
+    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        return new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self());
+    }
+
+    @Override
+    public DOMDataReadWriteTransaction newReadWriteTransaction() {
+        return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self()),
+                newWriteOnlyTransaction());
+    }
+
+    @Override
+    public DOMDataWriteTransaction newWriteOnlyTransaction() {
+        writeTx = delegateBroker.newWriteOnlyTransaction();
+        return new ProxyWriteOnlyTransaction(actorSystem, TypedActor.<NetconfDeviceMasterDataBroker>self());
+    }
+
+    @Override
+    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
+        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+    }
+
+    @Nonnull
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
+
+        final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
+        Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+                if (!result.isPresent()) {
+                    promise.success(Optional.<NormalizedNodeMessage>absent());
+                } else {
+                    promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
+
+        final DefaultPromise<Boolean> promise = new DefaultPromise<>();
+        Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean result) {
+                promise.success(result);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        if (writeTx == null) {
+            writeTx = delegateBroker.newWriteOnlyTransaction();
+        }
+        writeTx.put(store, data.getIdentifier(), data.getNode());
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
+        if (writeTx == null) {
+            writeTx = delegateBroker.newWriteOnlyTransaction();
+        }
+        writeTx.merge(store, data.getIdentifier(), data.getNode());
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        if (writeTx == null) {
+            writeTx = delegateBroker.newWriteOnlyTransaction();
+        }
+        writeTx.delete(store, path);
+    }
+
+    @Override
+    public boolean cancel() {
+        return writeTx.cancel();
+    }
+
+    @Override
+    public Future<Void> submit() {
+        final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
+        final DefaultPromise<Void> promise = new DefaultPromise<>();
+        Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                promise.success(result);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+        return promise.future();
+    }
+
+    @Override
+    @Deprecated
+    public Future<RpcResult<TransactionStatus>> commit() {
+        final ListenableFuture<RpcResult<TransactionStatus>> commitFuture = writeTx.commit();
+        final DefaultPromise<RpcResult<TransactionStatus>> promise = new DefaultPromise<>();
+        Futures.addCallback(commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
+            @Override
+            public void onSuccess(RpcResult<TransactionStatus> result) {
+                promise.success(result);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                promise.failure(t);
+            }
+        });
+        return promise.future();
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceSlaveDataBroker.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/NetconfDeviceSlaveDataBroker.java
new file mode 100644 (file)
index 0000000..23d7e10
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorSystem;
+import java.util.Collections;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
+import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public class NetconfDeviceSlaveDataBroker implements DOMDataBroker{
+
+    private final RemoteDeviceId id;
+    private final ProxyNetconfDeviceDataBroker masterDataBroker;
+    private final ActorSystem actorSystem;
+
+    public NetconfDeviceSlaveDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker masterDataBroker) {
+        this.id = id;
+        this.masterDataBroker = masterDataBroker;
+        this.actorSystem = actorSystem;
+    }
+
+    @Override
+    public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+        return new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker);
+    }
+
+    @Override
+    public DOMDataReadWriteTransaction newReadWriteTransaction() {
+        return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, masterDataBroker), new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker));
+    }
+
+    @Override
+    public DOMDataWriteTransaction newWriteOnlyTransaction() {
+        return new ProxyWriteOnlyTransaction(actorSystem, masterDataBroker);
+    }
+
+    @Override
+    public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
+        throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
+        throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
+    }
+
+    @Nonnull
+    @Override
+    public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+        return Collections.emptyMap();
+    }
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ProxyNetconfDeviceDataBroker.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ProxyNetconfDeviceDataBroker.java
new file mode 100644 (file)
index 0000000..3ef7688
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.Future;
+
+public interface ProxyNetconfDeviceDataBroker extends DOMDataBroker{
+    Future<Optional<NormalizedNodeMessage>> read(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    Future<Boolean> exists(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    void put(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+    void merge(LogicalDatastoreType store, NormalizedNodeMessage data);
+
+    void delete(LogicalDatastoreType store, YangInstanceIdentifier path);
+
+    boolean cancel();
+
+    Future<Void> submit();
+
+    @Deprecated
+    Future<RpcResult<TransactionStatus>> commit();
+}
index ec06c04b92f9e1fc9ac94885ad7b9e8e64ab7fc0..ec294b268f4528e94f5b9e495980a818a6a29165 100644 (file)
@@ -8,6 +8,14 @@
 
 package org.opendaylight.netconf.topology.pipeline;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.japi.Creator;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
@@ -17,9 +25,9 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceNotificationService;
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +36,9 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
 
     private static final Logger LOG = LoggerFactory.getLogger(TopologyMountPointFacade.class);
 
+    private static final String MOUNT_POINT = "mountpoint";
+
+    private final String topologyId;
     private final RemoteDeviceId id;
     private final Broker domBroker;
     private final BindingAwareBroker bindingBroker;
@@ -38,13 +49,17 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
     private DOMRpcService deviceRpc = null;
     private final ClusteredNetconfDeviceMountInstanceProxy salProvider;
 
+    private ActorSystem actorSystem;
+    private DOMDataBroker deviceDataBroker = null;
+
     private final ArrayList<RemoteDeviceHandler<NetconfSessionPreferences>> connectionStatusListeners = new ArrayList<>();
 
-    public TopologyMountPointFacade(final RemoteDeviceId id,
+    public TopologyMountPointFacade(final String topologyId,
+                                    final RemoteDeviceId id,
                                     final Broker domBroker,
                                     final BindingAwareBroker bindingBroker,
                                     long defaultRequestTimeoutMillis) {
-
+        this.topologyId = topologyId;
         this.id = id;
         this.domBroker = domBroker;
         this.bindingBroker = bindingBroker;
@@ -91,19 +106,54 @@ public class TopologyMountPointFacade implements AutoCloseable, RemoteDeviceHand
         salProvider.getMountInstance().publish(domNotification);
     }
 
-    public void registerMountPoint() {
+    public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context) {
         Preconditions.checkNotNull(id);
         Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
         Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+        this.actorSystem = actorSystem;
+        final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        final DOMDataBroker netconfDeviceDataBroker = new NetconfDeviceDataBroker(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+        LOG.warn("Creating master data broker for device {}", id);
+        deviceDataBroker = TypedActor.get(context).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, new Creator<NetconfDeviceMasterDataBroker>() {
+            @Override
+            public NetconfDeviceMasterDataBroker create() throws Exception {
+                return new NetconfDeviceMasterDataBroker(actorSystem, id, remoteSchemaContext, deviceRpc, netconfSessionPreferences, defaultRequestTimeoutMillis);
+            }
+        }), MOUNT_POINT);
+        LOG.warn("Master data broker registered on path {}", TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker).path());
+        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+        final Cluster cluster = Cluster.get(actorSystem);
+        final Iterable<Member> members = cluster.state().getMembers();
+        final ActorRef deviceDataBrokerRef = TypedActor.get(actorSystem).getActorRefFor(deviceDataBroker);
+        for (final Member member : members) {
+            if (!member.address().equals(cluster.selfAddress())) {
+                final String path = member.address() + "/user/" + topologyId + "/" + id.getName();
+                actorSystem.actorSelection(path).tell(new AnnounceMasterMountPoint(), deviceDataBrokerRef);
+            }
+        }
+    }
+
+    public void registerMountPoint(final ActorSystem actorSystem, final ActorContext context, final ActorRef masterRef) {
+        Preconditions.checkNotNull(id);
+        Preconditions.checkNotNull(remoteSchemaContext, "Device has no remote schema context yet. Probably not fully connected.");
+        Preconditions.checkNotNull(netconfSessionPreferences, "Device has no capabilities yet. Probably not fully connected.");
+        this.actorSystem = actorSystem;
         final NetconfDeviceNotificationService notificationService = new NetconfDeviceNotificationService();
 
-        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, netconfDeviceDataBroker, deviceRpc, notificationService);
+        LOG.warn("Creating a proxy for master data broker");
+        final ProxyNetconfDeviceDataBroker masterDataBroker = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(ProxyNetconfDeviceDataBroker.class, NetconfDeviceMasterDataBroker.class), masterRef);
+        LOG.warn("Creating slave data broker for device {}", id);
+        final DOMDataBroker deviceDataBroker = new NetconfDeviceSlaveDataBroker(actorSystem, id, masterDataBroker);
+        salProvider.getMountInstance().onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
     }
 
     public void unregisterMountPoint() {
         salProvider.getMountInstance().onTopologyDeviceDisconnected();
+        if (deviceDataBroker != null) {
+            LOG.warn("Stopping master data broker for device {}", id.getName());
+            TypedActor.get(actorSystem).stop(deviceDataBroker);
+            deviceDataBroker = null;
+        }
     }
 
     public ConnectionStatusListenerRegistration registerConnectionStatusListener(final RemoteDeviceHandler<NetconfSessionPreferences> listener) {
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/NetconfDeviceDataBrokerProxy.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/NetconfDeviceDataBrokerProxy.java
new file mode 100644 (file)
index 0000000..23fe120
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+public interface NetconfDeviceDataBrokerProxy {
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyReadOnlyTransaction.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyReadOnlyTransaction.java
new file mode 100644 (file)
index 0000000..c67673b
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class ProxyReadOnlyTransaction implements DOMDataReadOnlyTransaction{
+
+    private final RemoteDeviceId id;
+    private final ProxyNetconfDeviceDataBroker delegate;
+    private final ActorSystem actorSystem;
+
+    public ProxyReadOnlyTransaction(final ActorSystem actorSystem, final RemoteDeviceId id, final ProxyNetconfDeviceDataBroker delegate) {
+        this.id = id;
+        this.delegate = delegate;
+        this.actorSystem = actorSystem;
+    }
+
+    @Override
+    public void close() {
+        //NOOP
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final Future<Optional<NormalizedNodeMessage>> future = delegate.read(store, path);
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+            @Nullable
+            @Override
+            public ReadFailedException apply(Exception cause) {
+                return new ReadFailedException("Read from transaction failed", cause);
+            }
+        });
+        future.onComplete(new OnComplete<Optional<NormalizedNodeMessage>>() {
+            @Override
+            public void onComplete(Throwable throwable, Optional<NormalizedNodeMessage> normalizedNodeMessage) throws Throwable {
+                if (throwable == null) {
+                    settableFuture.set(normalizedNodeMessage.transform(new Function<NormalizedNodeMessage, NormalizedNode<?, ?>>() {
+                        @Nullable
+                        @Override
+                        public NormalizedNode<?, ?> apply(NormalizedNodeMessage input) {
+                            return input.getNode();
+                        }
+                    }));
+                } else {
+                    settableFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return checkedFuture;
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final Future<Boolean> existsFuture = delegate.exists(store, path);
+        final SettableFuture<Boolean> settableFuture = SettableFuture.create();
+        final CheckedFuture<Boolean, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+            @Nullable
+            @Override
+            public ReadFailedException apply(Exception cause) {
+                return new ReadFailedException("Read from transaction failed", cause);
+            }
+        });
+        existsFuture.onComplete(new OnComplete<Boolean>() {
+            @Override
+            public void onComplete(Throwable throwable, Boolean result) throws Throwable {
+                if (throwable == null) {
+                    settableFuture.set(result);
+                } else {
+                    settableFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return checkedFuture;
+    }
+
+    @Override
+    public Object getIdentifier() {
+        return this;
+    }
+}
diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyWriteOnlyTransaction.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/tx/ProxyWriteOnlyTransaction.java
new file mode 100644 (file)
index 0000000..d7a3c87
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.tx;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.netconf.topology.pipeline.ProxyNetconfDeviceDataBroker;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+public class ProxyWriteOnlyTransaction implements DOMDataWriteTransaction {
+
+    private final ProxyNetconfDeviceDataBroker delegate;
+    private final ActorSystem actorSystem;
+
+    public ProxyWriteOnlyTransaction(ActorSystem actorSystem, final ProxyNetconfDeviceDataBroker delegate) {
+        this.delegate = delegate;
+        this.actorSystem = actorSystem;
+    }
+
+    @Override
+    public void put (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){
+        delegate.put(store, new NormalizedNodeMessage(path, data));
+    }
+
+    @Override
+    public void merge (final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode < ?,?>data){
+        delegate.merge(store, new NormalizedNodeMessage(path, data));
+    }
+
+    @Override
+    public boolean cancel () {
+        return delegate.cancel();
+    }
+
+    @Override
+    public void delete (final LogicalDatastoreType store, final YangInstanceIdentifier path){
+        delegate.delete(store, path);
+    }
+
+    @Override
+    public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+        final Future<Void> submit = delegate.submit();
+        final SettableFuture<Void> settableFuture = SettableFuture.create();
+        final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, TransactionCommitFailedException>() {
+            @Nullable
+            @Override
+            public TransactionCommitFailedException apply(Exception input) {
+                return new TransactionCommitFailedException("Transaction commit failed", input);
+            }
+        });
+        submit.onComplete(new OnComplete<Void>() {
+            @Override
+            public void onComplete(Throwable throwable, Void aVoid) throws Throwable {
+                if (throwable == null) {
+                    settableFuture.set(aVoid);
+                } else {
+                    settableFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return checkedFuture;
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit () {
+        final Future<RpcResult<TransactionStatus>> commit = delegate.commit();
+        final SettableFuture<RpcResult<TransactionStatus>> settableFuture = SettableFuture.create();
+        commit.onComplete(new OnComplete<RpcResult<TransactionStatus>>() {
+            @Override
+            public void onComplete(Throwable throwable, RpcResult<TransactionStatus> transactionStatusRpcResult) throws Throwable {
+                if (throwable == null) {
+                    settableFuture.set(transactionStatusRpcResult);
+                } else {
+                    settableFuture.setException(throwable);
+                }
+            }
+        }, actorSystem.dispatcher());
+        return settableFuture;
+    }
+
+    @Override
+    public Object getIdentifier () {
+        return this;
+    }
+}
index 392838c4b8eaeca62c88c90f916dcc5401f5168f..544cb87d21c37a80d36e20ba28c2424b23f38a72 100644 (file)
@@ -45,6 +45,9 @@ import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
 import org.opendaylight.netconf.topology.example.ExampleNodeManagerCallback;
@@ -486,6 +489,31 @@ public class ActorTest {
         public void onReceive(Object o, ActorRef actorRef) {
 
         }
+
+        @Override
+        public void onDeviceConnected(SchemaContext remoteSchemaContext, NetconfSessionPreferences netconfSessionPreferences, DOMRpcService deviceRpc) {
+
+        }
+
+        @Override
+        public void onDeviceDisconnected() {
+
+        }
+
+        @Override
+        public void onDeviceFailed(Throwable throwable) {
+
+        }
+
+        @Override
+        public void onNotification(DOMNotification domNotification) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
     }
 
     public static class TestingTopologyManagerCallback implements TopologyManagerCallback {
index 7931bb8c392e7136744002269dd14583419cc663..28a1f97122f78d8917768382b598e9e87d99a638 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.netconf.topology;
 
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -108,9 +110,13 @@ public class TestingTopologyDispatcher implements NetconfTopology{
     }
 
     @Override
-    public void registerMountPoint(NodeId nodeId) {
+    public void registerMountPoint(ActorContext context, NodeId nodeId) {
         LOG.debug("Registering mount point for node {}", nodeId.getValue());
+    }
 
+    @Override
+    public void registerMountPoint(ActorContext context, NodeId nodeId, ActorRef masterRef) {
+        LOG.debug("Registering mount point for node {}", nodeId.getValue());
     }
 
     @Override