2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.api.NetconfMessage;
34 import org.opendaylight.netconf.api.NetconfTerminationReason;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
38 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
39 import org.opendaylight.netconf.topology.NetconfTopology;
40 import org.opendaylight.netconf.topology.NodeManager;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.RoleChangeStrategy;
43 import org.opendaylight.netconf.topology.TopologyManager;
44 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseNodeManager;
47 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
48 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
49 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
50 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
67 import org.opendaylight.yangtools.yang.common.QName;
68 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.concurrent.Future;
72 import scala.concurrent.duration.FiniteDuration;
74 public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
76 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
78 public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
80 public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
81 return new UnavailableCapabilityBuilder()
82 .setCapability(input.getKey().toString())
83 .setFailureReason(input.getValue()).build();
87 private static final String UNKNOWN_REASON = "Unknown reason";
89 private boolean isMaster = false;
90 private ClusteredNetconfTopology topologyDispatcher;
91 private final ActorSystem actorSystem;
92 private final Cluster clusterExtension;
94 private final RoleChangeStrategy roleChangeStrategy;
96 private String nodeId;
97 private String topologyId;
98 private TopologyManager topologyManager;
99 private NodeManager nodeManager;
100 // cached context so that we can use it in callbacks from topology
101 private ActorContext cachedContext;
103 private Node currentConfig;
104 private Node currentOperationalNode;
106 private ConnectionStatusListenerRegistration connectionStatusregistration = null;
107 private NetconfClientSessionListenerRegistration sessionListener = null;
109 private ActorRef masterDataBrokerRef = null;
110 private boolean connected = false;
112 public NetconfNodeManagerCallback(final String nodeId,
113 final String topologyId,
114 final ActorSystem actorSystem,
115 final NetconfTopology topologyDispatcher,
116 final RoleChangeStrategy roleChangeStrategy) {
117 this.nodeId = nodeId;
118 this.topologyId = topologyId;
119 this.actorSystem = actorSystem;
120 this.clusterExtension = Cluster.get(actorSystem);
121 this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
122 this.roleChangeStrategy = roleChangeStrategy;
124 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(topologyId);
125 final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection(pathCreator.build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
126 topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
128 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
129 if (throwable != null) {
130 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
134 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
135 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
137 }, actorSystem.dispatcher());
139 final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection(pathCreator.withSuffix(nodeId).build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
140 nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
142 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
143 if (throwable != null) {
144 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
146 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
147 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
149 }, actorSystem.dispatcher());
154 @Override public Node getInitialState(@Nonnull final NodeId nodeId,
155 @Nonnull final Node configNode) {
156 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
158 final Node initialNode = new NodeBuilder()
160 .addAugmentation(NetconfNode.class,
161 new NetconfNodeBuilder()
162 .setHost(netconfNode.getHost())
163 .setPort(netconfNode.getPort())
164 .setConnectionStatus(ConnectionStatus.Connecting)
165 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
166 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
167 .setClusteredConnectionStatus(
168 new ClusteredConnectionStatusBuilder()
171 new NodeStatusBuilder()
172 .setNode(clusterExtension.selfAddress().toString())
173 .setStatus(Status.Unavailable)
179 if (currentOperationalNode == null) {
180 currentOperationalNode = initialNode;
186 @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
187 @Nullable final Node configNode) {
188 final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
190 final Node failedNode = new NodeBuilder()
192 .addAugmentation(NetconfNode.class,
193 new NetconfNodeBuilder()
194 .setHost(netconfNode.getHost())
195 .setPort(netconfNode.getPort())
196 .setConnectionStatus(ConnectionStatus.UnableToConnect)
197 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
198 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
199 .setClusteredConnectionStatus(
200 new ClusteredConnectionStatusBuilder()
202 Collections.singletonList(
203 new NodeStatusBuilder()
204 .setNode(clusterExtension.selfAddress().toString())
205 .setStatus(Status.Failed)
211 if (currentOperationalNode == null) {
212 currentOperationalNode = failedNode;
218 @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
219 @Nonnull final Node configNode) {
220 cachedContext = TypedActor.context();
221 this.nodeId = nodeId.getValue();
222 this.currentConfig = configNode;
223 // set initial state before anything happens
224 this.currentOperationalNode = getInitialState(nodeId, configNode);
226 // connect magic, send config into the netconf pipeline through topo dispatcher
227 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
229 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
231 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
232 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
233 sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
237 public void onFailure(Throwable t) {
238 LOG.error("Connection to device failed", t);
242 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
244 // transform future result into state that gets written into datastore
245 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
248 public Node apply(NetconfDeviceCapabilities input) {
250 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
251 .addAugmentation(NetconfNode.class,
252 new NetconfNodeBuilder()
253 .setConnectionStatus(ConnectionStatus.Connected)
254 .setClusteredConnectionStatus(
255 new ClusteredConnectionStatusBuilder()
257 Collections.singletonList(
258 new NodeStatusBuilder()
259 .setNode(clusterExtension.selfAddress().toString())
260 .setStatus(Status.Connected)
263 .setHost(netconfNode.getHost())
264 .setPort(netconfNode.getPort())
265 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
266 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
268 return currentOperationalNode;
275 public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
276 @Nonnull final Node configNode) {
277 // first disconnect this node
278 topologyDispatcher.unregisterMountPoint(nodeId);
280 if (connectionStatusregistration != null) {
281 connectionStatusregistration.close();
283 topologyDispatcher.disconnectNode(nodeId);
285 // now reinit this connection with new settings
286 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
288 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
290 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
291 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
295 public void onFailure(Throwable t) {
296 LOG.error("Connection to device failed", t);
300 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
302 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
305 public Node apply(NetconfDeviceCapabilities input) {
307 return new NodeBuilder()
309 .addAugmentation(NetconfNode.class,
310 new NetconfNodeBuilder()
311 .setConnectionStatus(ConnectionStatus.Connected)
312 .setClusteredConnectionStatus(
313 new ClusteredConnectionStatusBuilder()
315 Collections.singletonList(
316 new NodeStatusBuilder()
317 .setNode(clusterExtension.selfAddress().toString())
318 .setStatus(Status.Connected)
321 .setHost(netconfNode.getHost())
322 .setPort(netconfNode.getPort())
323 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
324 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
331 @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
332 // cleanup and disconnect
333 topologyDispatcher.unregisterMountPoint(nodeId);
335 if(connectionStatusregistration != null) {
336 connectionStatusregistration.close();
338 roleChangeStrategy.unregisterRoleCandidate();
339 return topologyDispatcher.disconnectNode(nodeId);
344 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
345 LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
346 return Futures.immediateFuture(currentOperationalNode);
350 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
351 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
353 isMaster = roleChangeDTO.isOwner();
357 public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
358 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
361 LOG.debug("Master is done with schema resolution, registering mount point");
362 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
363 } else if (masterDataBrokerRef != null) {
364 LOG.warn("Device connected, master already present in topology, registering mount point");
365 topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
368 List<AvailableCapability> capabilityList = new ArrayList<>();
369 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
370 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities());
371 final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
372 avCapabalitiesBuilder.setAvailableCapability(capabilityList);
374 final UnavailableCapabilities unavailableCapabilities =
375 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
376 .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
378 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
379 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
380 .addAugmentation(NetconfNode.class,
381 new NetconfNodeBuilder()
382 .setConnectionStatus(ConnectionStatus.Connected)
383 .setClusteredConnectionStatus(
384 new ClusteredConnectionStatusBuilder()
386 Collections.singletonList(
387 new NodeStatusBuilder()
388 .setNode(clusterExtension.selfAddress().toString())
389 .setStatus(Status.Connected)
392 .setHost(netconfNode.getHost())
393 .setPort(netconfNode.getPort())
394 .setAvailableCapabilities(avCapabalitiesBuilder.build())
395 .setUnavailableCapabilities(unavailableCapabilities)
398 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
402 public void onDeviceDisconnected() {
403 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
404 LOG.debug("onDeviceDisconnected received, unregistered role candidate");
407 // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
409 // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
410 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
413 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
414 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
415 .addAugmentation(NetconfNode.class,
416 new NetconfNodeBuilder()
417 .setConnectionStatus(ConnectionStatus.Connecting)
418 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
419 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
420 .setClusteredConnectionStatus(
421 new ClusteredConnectionStatusBuilder()
423 Collections.singletonList(
424 new NodeStatusBuilder()
425 .setNode(clusterExtension.selfAddress().toString())
426 .setStatus(Status.Unavailable)
429 .setHost(netconfNode.getHost())
430 .setPort(netconfNode.getPort())
432 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
436 public void onDeviceFailed(Throwable throwable) {
437 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
438 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
439 LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
441 String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
443 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
444 .addAugmentation(NetconfNode.class,
445 new NetconfNodeBuilder()
446 .setConnectionStatus(ConnectionStatus.UnableToConnect)
447 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
448 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
449 .setClusteredConnectionStatus(
450 new ClusteredConnectionStatusBuilder()
452 Collections.singletonList(
453 new NodeStatusBuilder()
454 .setNode(clusterExtension.selfAddress().toString())
455 .setStatus(Status.Failed)
458 .setConnectedMessage(reason)
460 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
464 public void onNotification(DOMNotification domNotification) {
469 public void close() {
474 public void onReceive(Object message, ActorRef actorRef) {
475 LOG.debug("Netconf node callback received message {}", message);
476 if (message instanceof AnnounceMasterMountPoint) {
477 masterDataBrokerRef = actorRef;
478 // candidate gets registered when mount point is already prepared so we can go ahead a register it
480 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
482 LOG.debug("Announce master mount point msg received but mount point is not ready yet");
484 } else if (message instanceof AnnounceMasterMountPointDown) {
485 LOG.debug("Master mountpoint went down");
486 masterDataBrokerRef = null;
487 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
492 public void onSessionUp(NetconfClientSession netconfClientSession) {
493 //NetconfClientSession is up, we can register role candidate
494 LOG.debug("Netconf client session is up, registering role candidate");
495 roleChangeStrategy.registerRoleCandidate(nodeManager);
499 public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
500 LOG.debug("Netconf client session is down, unregistering role candidate");
501 roleChangeStrategy.unregisterRoleCandidate();
505 public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
506 LOG.debug("Netconf client session is down, unregistering role candidate");
507 roleChangeStrategy.unregisterRoleCandidate();
511 public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {