2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.api.NetconfMessage;
34 import org.opendaylight.netconf.api.NetconfTerminationReason;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
38 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
39 import org.opendaylight.netconf.topology.NetconfTopology;
40 import org.opendaylight.netconf.topology.NodeManager;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.RoleChangeStrategy;
43 import org.opendaylight.netconf.topology.TopologyManager;
44 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseNodeManager;
47 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
48 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
49 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
50 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
66 import org.opendaylight.yangtools.yang.common.QName;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
73 public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
75 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
77 public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
79 public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
80 return new UnavailableCapabilityBuilder()
81 .setCapability(input.getKey().toString())
82 .setFailureReason(input.getValue()).build();
85 public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
87 public String apply(QName qName) {
88 // intern string representation of a capability to avoid duplicates
89 return qName.toString().intern();
93 private static final String UNKNOWN_REASON = "Unknown reason";
95 private boolean isMaster = false;
96 private ClusteredNetconfTopology topologyDispatcher;
97 private final ActorSystem actorSystem;
98 private final Cluster clusterExtension;
100 private final RoleChangeStrategy roleChangeStrategy;
102 private String nodeId;
103 private String topologyId;
104 private TopologyManager topologyManager;
105 private NodeManager nodeManager;
106 // cached context so that we can use it in callbacks from topology
107 private ActorContext cachedContext;
109 private Node currentConfig;
110 private Node currentOperationalNode;
112 private ConnectionStatusListenerRegistration connectionStatusregistration = null;
113 private NetconfClientSessionListenerRegistration sessionListener = null;
115 private ActorRef masterDataBrokerRef = null;
116 private boolean connected = false;
118 public NetconfNodeManagerCallback(final String nodeId,
119 final String topologyId,
120 final ActorSystem actorSystem,
121 final NetconfTopology topologyDispatcher,
122 final RoleChangeStrategy roleChangeStrategy) {
123 this.nodeId = nodeId;
124 this.topologyId = topologyId;
125 this.actorSystem = actorSystem;
126 this.clusterExtension = Cluster.get(actorSystem);
127 this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
128 this.roleChangeStrategy = roleChangeStrategy;
130 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(topologyId);
131 final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection(pathCreator.build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
132 topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
134 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
135 if (throwable != null) {
136 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
140 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
141 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
143 }, actorSystem.dispatcher());
145 final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection(pathCreator.withSuffix(nodeId).build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
146 nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
148 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
149 if (throwable != null) {
150 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
152 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
153 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
155 }, actorSystem.dispatcher());
160 @Override public Node getInitialState(@Nonnull final NodeId nodeId,
161 @Nonnull final Node configNode) {
162 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
164 final Node initialNode = new NodeBuilder()
166 .addAugmentation(NetconfNode.class,
167 new NetconfNodeBuilder()
168 .setHost(netconfNode.getHost())
169 .setPort(netconfNode.getPort())
170 .setConnectionStatus(ConnectionStatus.Connecting)
171 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
172 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
173 .setClusteredConnectionStatus(
174 new ClusteredConnectionStatusBuilder()
177 new NodeStatusBuilder()
178 .setNode(clusterExtension.selfAddress().toString())
179 .setStatus(Status.Unavailable)
185 if (currentOperationalNode == null) {
186 currentOperationalNode = initialNode;
192 @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
193 @Nullable final Node configNode) {
194 final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
196 final Node failedNode = new NodeBuilder()
198 .addAugmentation(NetconfNode.class,
199 new NetconfNodeBuilder()
200 .setHost(netconfNode.getHost())
201 .setPort(netconfNode.getPort())
202 .setConnectionStatus(ConnectionStatus.UnableToConnect)
203 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
204 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
205 .setClusteredConnectionStatus(
206 new ClusteredConnectionStatusBuilder()
208 Collections.singletonList(
209 new NodeStatusBuilder()
210 .setNode(clusterExtension.selfAddress().toString())
211 .setStatus(Status.Failed)
217 if (currentOperationalNode == null) {
218 currentOperationalNode = failedNode;
224 @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
225 @Nonnull final Node configNode) {
226 cachedContext = TypedActor.context();
227 this.nodeId = nodeId.getValue();
228 this.currentConfig = configNode;
229 // set initial state before anything happens
230 this.currentOperationalNode = getInitialState(nodeId, configNode);
232 // connect magic, send config into the netconf pipeline through topo dispatcher
233 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
235 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
237 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
238 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
239 sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
243 public void onFailure(Throwable t) {
244 LOG.error("Connection to device failed", t);
248 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
250 // transform future result into state that gets written into datastore
251 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
254 public Node apply(NetconfDeviceCapabilities input) {
256 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
257 .addAugmentation(NetconfNode.class,
258 new NetconfNodeBuilder()
259 .setConnectionStatus(ConnectionStatus.Connected)
260 .setClusteredConnectionStatus(
261 new ClusteredConnectionStatusBuilder()
263 Collections.singletonList(
264 new NodeStatusBuilder()
265 .setNode(clusterExtension.selfAddress().toString())
266 .setStatus(Status.Connected)
269 .setHost(netconfNode.getHost())
270 .setPort(netconfNode.getPort())
271 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
272 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
274 return currentOperationalNode;
281 public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
282 @Nonnull final Node configNode) {
283 // first disconnect this node
284 topologyDispatcher.unregisterMountPoint(nodeId);
286 if (connectionStatusregistration != null) {
287 connectionStatusregistration.close();
289 topologyDispatcher.disconnectNode(nodeId);
291 // now reinit this connection with new settings
292 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
294 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
296 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
297 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
301 public void onFailure(Throwable t) {
302 LOG.error("Connection to device failed", t);
306 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
308 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
311 public Node apply(NetconfDeviceCapabilities input) {
313 return new NodeBuilder()
315 .addAugmentation(NetconfNode.class,
316 new NetconfNodeBuilder()
317 .setConnectionStatus(ConnectionStatus.Connected)
318 .setClusteredConnectionStatus(
319 new ClusteredConnectionStatusBuilder()
321 Collections.singletonList(
322 new NodeStatusBuilder()
323 .setNode(clusterExtension.selfAddress().toString())
324 .setStatus(Status.Connected)
327 .setHost(netconfNode.getHost())
328 .setPort(netconfNode.getPort())
329 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
330 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
337 @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
338 // cleanup and disconnect
339 topologyDispatcher.unregisterMountPoint(nodeId);
341 if(connectionStatusregistration != null) {
342 connectionStatusregistration.close();
344 roleChangeStrategy.unregisterRoleCandidate();
345 return topologyDispatcher.disconnectNode(nodeId);
350 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
351 LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
352 return Futures.immediateFuture(currentOperationalNode);
356 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
357 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
359 isMaster = roleChangeDTO.isOwner();
363 public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
364 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
367 LOG.debug("Master is done with schema resolution, registering mount point");
368 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
369 } else if (masterDataBrokerRef != null) {
370 LOG.warn("Device connected, master already present in topology, registering mount point");
371 topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
374 List<String> capabilityList = new ArrayList<>();
375 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
376 capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
377 final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
378 avCapabalitiesBuilder.setAvailableCapability(capabilityList);
380 final UnavailableCapabilities unavailableCapabilities =
381 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
382 .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
384 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
385 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
386 .addAugmentation(NetconfNode.class,
387 new NetconfNodeBuilder()
388 .setConnectionStatus(ConnectionStatus.Connected)
389 .setClusteredConnectionStatus(
390 new ClusteredConnectionStatusBuilder()
392 Collections.singletonList(
393 new NodeStatusBuilder()
394 .setNode(clusterExtension.selfAddress().toString())
395 .setStatus(Status.Connected)
398 .setHost(netconfNode.getHost())
399 .setPort(netconfNode.getPort())
400 .setAvailableCapabilities(avCapabalitiesBuilder.build())
401 .setUnavailableCapabilities(unavailableCapabilities)
404 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
408 public void onDeviceDisconnected() {
409 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
410 LOG.debug("onDeviceDisconnected received, unregistered role candidate");
413 // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
415 // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
416 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
419 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
420 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
421 .addAugmentation(NetconfNode.class,
422 new NetconfNodeBuilder()
423 .setConnectionStatus(ConnectionStatus.Connecting)
424 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
425 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
426 .setClusteredConnectionStatus(
427 new ClusteredConnectionStatusBuilder()
429 Collections.singletonList(
430 new NodeStatusBuilder()
431 .setNode(clusterExtension.selfAddress().toString())
432 .setStatus(Status.Unavailable)
435 .setHost(netconfNode.getHost())
436 .setPort(netconfNode.getPort())
438 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
442 public void onDeviceFailed(Throwable throwable) {
443 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
444 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
445 LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
447 String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
449 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
450 .addAugmentation(NetconfNode.class,
451 new NetconfNodeBuilder()
452 .setConnectionStatus(ConnectionStatus.UnableToConnect)
453 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
454 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
455 .setClusteredConnectionStatus(
456 new ClusteredConnectionStatusBuilder()
458 Collections.singletonList(
459 new NodeStatusBuilder()
460 .setNode(clusterExtension.selfAddress().toString())
461 .setStatus(Status.Failed)
464 .setConnectedMessage(reason)
466 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
470 public void onNotification(DOMNotification domNotification) {
475 public void close() {
480 public void onReceive(Object message, ActorRef actorRef) {
481 LOG.debug("Netconf node callback received message {}", message);
482 if (message instanceof AnnounceMasterMountPoint) {
483 masterDataBrokerRef = actorRef;
484 // candidate gets registered when mount point is already prepared so we can go ahead a register it
486 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
488 LOG.debug("Announce master mount point msg received but mount point is not ready yet");
490 } else if (message instanceof AnnounceMasterMountPointDown) {
491 LOG.debug("Master mountpoint went down");
492 masterDataBrokerRef = null;
493 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
498 public void onSessionUp(NetconfClientSession netconfClientSession) {
499 //NetconfClientSession is up, we can register role candidate
500 LOG.debug("Netconf client session is up, registering role candidate");
501 roleChangeStrategy.registerRoleCandidate(nodeManager);
505 public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
506 LOG.debug("Netconf client session is down, unregistering role candidate");
507 roleChangeStrategy.unregisterRoleCandidate();
511 public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
512 LOG.debug("Netconf client session is down, unregistering role candidate");
513 roleChangeStrategy.unregisterRoleCandidate();
517 public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {