RemoteDeviceDataBroker proxy
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
index a17650a6fb0403a30eeb33f445dd671682c548e2..5f0eb2e179ffe910b1947375984d2e400833aae4 100644 (file)
@@ -8,11 +8,13 @@
 
 package org.opendaylight.netconf.topology.impl;
 
+import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.TypedActor;
 import akka.actor.TypedProps;
 import akka.cluster.Cluster;
+import akka.cluster.Member;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
@@ -29,15 +31,18 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
-import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
 import org.opendaylight.netconf.topology.NetconfTopology;
+import org.opendaylight.netconf.topology.NodeManager;
 import org.opendaylight.netconf.topology.NodeManagerCallback;
 import org.opendaylight.netconf.topology.RoleChangeStrategy;
 import org.opendaylight.netconf.topology.TopologyManager;
 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
+import org.opendaylight.netconf.topology.util.BaseNodeManager;
 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
+import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
@@ -60,7 +65,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
+public class NetconfNodeManagerCallback implements NodeManagerCallback{
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
 
@@ -92,12 +97,18 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
     private String nodeId;
     private String topologyId;
     private TopologyManager topologyManager;
+    private NodeManager nodeManager;
+    // cached context so that we can use it in callbacks from topology
+    private ActorContext cachedContext;
 
     private Node currentConfig;
     private Node currentOperationalNode;
 
     private ConnectionStatusListenerRegistration registration = null;
 
+    private ActorRef masterDataBrokerRef = null;
+    private boolean connected = false;
+
     public NetconfNodeManagerCallback(final String nodeId,
                                       final String topologyId,
                                       final ActorSystem actorSystem,
@@ -123,6 +134,18 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
             }
         }, actorSystem.dispatcher());
+
+        final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
+        nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
+            @Override
+            public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
+                if (throwable != null) {
+                    LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
+                }
+                LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
+                nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
+            }
+        }, actorSystem.dispatcher());
     }
 
 
@@ -183,6 +206,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
 
     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
                                                                    @Nonnull final Node configNode) {
+        cachedContext = TypedActor.context();
         this.nodeId = nodeId.getValue();
         this.currentConfig = configNode;
         // set initial state before anything happens
@@ -194,7 +218,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
             @Override
             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
-                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
+                registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
             }
 
             @Override
@@ -284,7 +308,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
                                         .build())
-                                .build();
+                        .build();
             }
         });
     }
@@ -306,16 +330,20 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
 
     @Override
     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
-        if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
-            return;
-        }
+        topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+
         isMaster = roleChangeDTO.isOwner();
-        //TODO instead of registering mount point, init remote schema repo when its done
         if (isMaster) {
-            // unregister old mountPoint if ownership changed, register a new one
-            topologyDispatcher.registerMountPoint(new NodeId(nodeId));
+            LOG.warn("Gained ownership of node - registering master mount point");
+            topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
         } else {
-            topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+            // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
+            // after we receive the message we can go ahead and register the mount point
+            if (connected && masterDataBrokerRef != null) {
+                topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+            } else {
+                LOG.debug("Mount point is ready, still waiting for master mount point");
+            }
         }
     }
 
@@ -323,7 +351,13 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
         LOG.debug("onDeviceConnected received, registering role candidate");
-        roleChangeStrategy.registerRoleCandidate(this);
+        connected = true;
+        roleChangeStrategy.registerRoleCandidate(nodeManager);
+        if (!isMaster && masterDataBrokerRef != null) {
+            // if we're not master but one is present already, we need to register mountpoint
+            LOG.warn("Device connected, master already present in topology, registering mount point");
+            topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
+        }
         List<String> capabilityList = new ArrayList<>();
         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
@@ -354,17 +388,26 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                                 .setUnavailableCapabilities(unavailableCapabilities)
                                 .build())
                 .build();
-        // TODO need to implement forwarding of this msg to master
         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
     }
 
     @Override
     public void onDeviceDisconnected() {
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
-        // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
         LOG.debug("onDeviceDisconnected received, unregistering role candidate");
-        topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
+        connected = false;
+        if (isMaster) {
+            // announce that master mount point is going down
+            for (final Member member : clusterExtension.state().getMembers()) {
+                actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
+            }
+            // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
+            isMaster = false;
+            // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
+            topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+        }
         roleChangeStrategy.unregisterRoleCandidate();
+
         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
                 .addAugmentation(NetconfNode.class,
@@ -382,7 +425,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
                                 .setHost(netconfNode.getHost())
                                 .setPort(netconfNode.getPort())
                                 .build()).build();
-        // TODO need to implement forwarding of this msg to master
         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
     }
 
@@ -391,6 +433,7 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
         LOG.debug("onDeviceFailed received");
+        connected = false;
         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
 
         roleChangeStrategy.unregisterRoleCandidate();
@@ -412,7 +455,6 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
     }
 
-
     @Override
     public void onNotification(DOMNotification domNotification) {
         //NOOP
@@ -424,7 +466,20 @@ public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDe
     }
 
     @Override
-    public void onReceive(Object o, ActorRef actorRef) {
-
+    public void onReceive(Object message, ActorRef actorRef) {
+        LOG.warn("Netconf node callback received message {}", message);
+        if (message instanceof AnnounceMasterMountPoint) {
+            masterDataBrokerRef = actorRef;
+            // candidate gets registered when mount point is already prepared so we can go ahead a register it
+            if (roleChangeStrategy.isCandidateRegistered()) {
+                topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
+            } else {
+                LOG.warn("Announce master mount point msg received but mount point is not ready yet");
+            }
+        } else if (message instanceof AnnounceMasterMountPointDown) {
+            LOG.warn("Master mountpoint went down");
+            masterDataBrokerRef = null;
+            topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
+        }
     }
 }
\ No newline at end of file