2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.api.NetconfMessage;
34 import org.opendaylight.netconf.api.NetconfTerminationReason;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
38 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
39 import org.opendaylight.netconf.topology.NetconfTopology;
40 import org.opendaylight.netconf.topology.NodeManager;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.RoleChangeStrategy;
43 import org.opendaylight.netconf.topology.TopologyManager;
44 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseNodeManager;
47 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
48 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
49 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
65 import org.opendaylight.yangtools.yang.common.QName;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
72 public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
74 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
76 public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
78 public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
79 return new UnavailableCapabilityBuilder()
80 .setCapability(input.getKey().toString())
81 .setFailureReason(input.getValue()).build();
84 public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
86 public String apply(QName qName) {
87 // intern string representation of a capability to avoid duplicates
88 return qName.toString().intern();
92 private static final String UNKNOWN_REASON = "Unknown reason";
94 private boolean isMaster = false;
95 private ClusteredNetconfTopology topologyDispatcher;
96 private final ActorSystem actorSystem;
97 private final Cluster clusterExtension;
99 private final RoleChangeStrategy roleChangeStrategy;
101 private String nodeId;
102 private String topologyId;
103 private TopologyManager topologyManager;
104 private NodeManager nodeManager;
105 // cached context so that we can use it in callbacks from topology
106 private ActorContext cachedContext;
108 private Node currentConfig;
109 private Node currentOperationalNode;
111 private ConnectionStatusListenerRegistration connectionStatusregistration = null;
112 private NetconfClientSessionListenerRegistration sessionListener = null;
114 private ActorRef masterDataBrokerRef = null;
115 private boolean connected = false;
117 public NetconfNodeManagerCallback(final String nodeId,
118 final String topologyId,
119 final ActorSystem actorSystem,
120 final NetconfTopology topologyDispatcher,
121 final RoleChangeStrategy roleChangeStrategy) {
122 this.nodeId = nodeId;
123 this.topologyId = topologyId;
124 this.actorSystem = actorSystem;
125 this.clusterExtension = Cluster.get(actorSystem);
126 this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
127 this.roleChangeStrategy = roleChangeStrategy;
129 final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
130 topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
132 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
133 if (throwable != null) {
134 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
138 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
139 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
141 }, actorSystem.dispatcher());
143 final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
144 nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
146 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
147 if (throwable != null) {
148 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
150 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
151 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
153 }, actorSystem.dispatcher());
158 @Override public Node getInitialState(@Nonnull final NodeId nodeId,
159 @Nonnull final Node configNode) {
160 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
162 final Node initialNode = new NodeBuilder()
164 .addAugmentation(NetconfNode.class,
165 new NetconfNodeBuilder()
166 .setHost(netconfNode.getHost())
167 .setPort(netconfNode.getPort())
168 .setConnectionStatus(ConnectionStatus.Connecting)
169 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
170 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
171 .setClusteredConnectionStatus(
172 new ClusteredConnectionStatusBuilder()
175 new NodeStatusBuilder()
176 .setNode(clusterExtension.selfAddress().toString())
177 .setStatus(Status.Unavailable)
183 if (currentOperationalNode == null) {
184 currentOperationalNode = initialNode;
190 @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
191 @Nullable final Node configNode) {
192 final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
194 final Node failedNode = new NodeBuilder()
196 .addAugmentation(NetconfNode.class,
197 new NetconfNodeBuilder()
198 .setHost(netconfNode.getHost())
199 .setPort(netconfNode.getPort())
200 .setConnectionStatus(ConnectionStatus.UnableToConnect)
201 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
202 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
203 .setClusteredConnectionStatus(
204 new ClusteredConnectionStatusBuilder()
206 Collections.singletonList(
207 new NodeStatusBuilder()
208 .setNode(clusterExtension.selfAddress().toString())
209 .setStatus(Status.Failed)
215 if (currentOperationalNode == null) {
216 currentOperationalNode = failedNode;
222 @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
223 @Nonnull final Node configNode) {
224 cachedContext = TypedActor.context();
225 this.nodeId = nodeId.getValue();
226 this.currentConfig = configNode;
227 // set initial state before anything happens
228 this.currentOperationalNode = getInitialState(nodeId, configNode);
230 // connect magic, send config into the netconf pipeline through topo dispatcher
231 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
233 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
235 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
236 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
237 sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
241 public void onFailure(Throwable t) {
242 LOG.error("Connection to device failed", t);
246 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
248 // transform future result into state that gets written into datastore
249 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
252 public Node apply(NetconfDeviceCapabilities input) {
254 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
255 .addAugmentation(NetconfNode.class,
256 new NetconfNodeBuilder()
257 .setConnectionStatus(ConnectionStatus.Connected)
258 .setClusteredConnectionStatus(
259 new ClusteredConnectionStatusBuilder()
261 Collections.singletonList(
262 new NodeStatusBuilder()
263 .setNode(clusterExtension.selfAddress().toString())
264 .setStatus(Status.Connected)
267 .setHost(netconfNode.getHost())
268 .setPort(netconfNode.getPort())
269 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
270 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
272 return currentOperationalNode;
279 public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
280 @Nonnull final Node configNode) {
281 // first disconnect this node
282 topologyDispatcher.unregisterMountPoint(nodeId);
284 if (connectionStatusregistration != null) {
285 connectionStatusregistration.close();
287 topologyDispatcher.disconnectNode(nodeId);
289 // now reinit this connection with new settings
290 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
292 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
294 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
295 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
299 public void onFailure(Throwable t) {
300 LOG.error("Connection to device failed", t);
304 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
306 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
309 public Node apply(NetconfDeviceCapabilities input) {
311 return new NodeBuilder()
313 .addAugmentation(NetconfNode.class,
314 new NetconfNodeBuilder()
315 .setConnectionStatus(ConnectionStatus.Connected)
316 .setClusteredConnectionStatus(
317 new ClusteredConnectionStatusBuilder()
319 Collections.singletonList(
320 new NodeStatusBuilder()
321 .setNode(clusterExtension.selfAddress().toString())
322 .setStatus(Status.Connected)
325 .setHost(netconfNode.getHost())
326 .setPort(netconfNode.getPort())
327 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
328 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
335 @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
336 // cleanup and disconnect
337 topologyDispatcher.unregisterMountPoint(nodeId);
339 if(connectionStatusregistration != null) {
340 connectionStatusregistration.close();
342 roleChangeStrategy.unregisterRoleCandidate();
343 return topologyDispatcher.disconnectNode(nodeId);
348 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
349 LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
350 return Futures.immediateFuture(currentOperationalNode);
354 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
355 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
357 isMaster = roleChangeDTO.isOwner();
361 public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
362 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
365 LOG.debug("Master is done with schema resolution, registering mount point");
366 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
367 } else if (masterDataBrokerRef != null) {
368 LOG.warn("Device connected, master already present in topology, registering mount point");
369 topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
372 List<String> capabilityList = new ArrayList<>();
373 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
374 capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
375 final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
376 avCapabalitiesBuilder.setAvailableCapability(capabilityList);
378 final UnavailableCapabilities unavailableCapabilities =
379 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
380 .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
382 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
383 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
384 .addAugmentation(NetconfNode.class,
385 new NetconfNodeBuilder()
386 .setConnectionStatus(ConnectionStatus.Connected)
387 .setClusteredConnectionStatus(
388 new ClusteredConnectionStatusBuilder()
390 Collections.singletonList(
391 new NodeStatusBuilder()
392 .setNode(clusterExtension.selfAddress().toString())
393 .setStatus(Status.Connected)
396 .setHost(netconfNode.getHost())
397 .setPort(netconfNode.getPort())
398 .setAvailableCapabilities(avCapabalitiesBuilder.build())
399 .setUnavailableCapabilities(unavailableCapabilities)
402 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
406 public void onDeviceDisconnected() {
407 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
408 LOG.debug("onDeviceDisconnected received, unregistered role candidate");
411 // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
413 // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
414 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
417 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
418 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
419 .addAugmentation(NetconfNode.class,
420 new NetconfNodeBuilder()
421 .setConnectionStatus(ConnectionStatus.Connecting)
422 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
423 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
424 .setClusteredConnectionStatus(
425 new ClusteredConnectionStatusBuilder()
427 Collections.singletonList(
428 new NodeStatusBuilder()
429 .setNode(clusterExtension.selfAddress().toString())
430 .setStatus(Status.Unavailable)
433 .setHost(netconfNode.getHost())
434 .setPort(netconfNode.getPort())
436 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
440 public void onDeviceFailed(Throwable throwable) {
441 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
442 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
443 LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
445 String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
447 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
448 .addAugmentation(NetconfNode.class,
449 new NetconfNodeBuilder()
450 .setConnectionStatus(ConnectionStatus.UnableToConnect)
451 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
452 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
453 .setClusteredConnectionStatus(
454 new ClusteredConnectionStatusBuilder()
456 Collections.singletonList(
457 new NodeStatusBuilder()
458 .setNode(clusterExtension.selfAddress().toString())
459 .setStatus(Status.Failed)
462 .setConnectedMessage(reason)
464 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
468 public void onNotification(DOMNotification domNotification) {
473 public void close() {
478 public void onReceive(Object message, ActorRef actorRef) {
479 LOG.debug("Netconf node callback received message {}", message);
480 if (message instanceof AnnounceMasterMountPoint) {
481 masterDataBrokerRef = actorRef;
482 // candidate gets registered when mount point is already prepared so we can go ahead a register it
484 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
486 LOG.debug("Announce master mount point msg received but mount point is not ready yet");
488 } else if (message instanceof AnnounceMasterMountPointDown) {
489 LOG.debug("Master mountpoint went down");
490 masterDataBrokerRef = null;
491 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
496 public void onSessionUp(NetconfClientSession netconfClientSession) {
497 //NetconfClientSession is up, we can register role candidate
498 LOG.debug("Netconf client session is up, registering role candidate");
499 roleChangeStrategy.registerRoleCandidate(nodeManager);
503 public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
504 LOG.debug("Netconf client session is down, unregistering role candidate");
505 roleChangeStrategy.unregisterRoleCandidate();
509 public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
510 LOG.debug("Netconf client session is down, unregistering role candidate");
511 roleChangeStrategy.unregisterRoleCandidate();
515 public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {