2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.cluster.Member;
18 import akka.dispatch.OnComplete;
19 import com.google.common.base.Function;
20 import com.google.common.collect.FluentIterable;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
33 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
34 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
35 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
36 import org.opendaylight.netconf.topology.NetconfTopology;
37 import org.opendaylight.netconf.topology.NodeManager;
38 import org.opendaylight.netconf.topology.NodeManagerCallback;
39 import org.opendaylight.netconf.topology.RoleChangeStrategy;
40 import org.opendaylight.netconf.topology.TopologyManager;
41 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
42 import org.opendaylight.netconf.topology.util.BaseNodeManager;
43 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
44 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
45 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
61 import org.opendaylight.yangtools.yang.common.QName;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Future;
66 import scala.concurrent.duration.FiniteDuration;
68 public class NetconfNodeManagerCallback implements NodeManagerCallback{
70 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
72 public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
74 public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
75 return new UnavailableCapabilityBuilder()
76 .setCapability(input.getKey().toString())
77 .setFailureReason(input.getValue()).build();
80 public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
82 public String apply(QName qName) {
83 // intern string representation of a capability to avoid duplicates
84 return qName.toString().intern();
88 private static final String UNKNOWN_REASON = "Unknown reason";
90 private boolean isMaster = false;
91 private ClusteredNetconfTopology topologyDispatcher;
92 private final ActorSystem actorSystem;
93 private final Cluster clusterExtension;
95 private final RoleChangeStrategy roleChangeStrategy;
97 private String nodeId;
98 private String topologyId;
99 private TopologyManager topologyManager;
100 private NodeManager nodeManager;
101 // cached context so that we can use it in callbacks from topology
102 private ActorContext cachedContext;
104 private Node currentConfig;
105 private Node currentOperationalNode;
107 private ConnectionStatusListenerRegistration registration = null;
109 private ActorRef masterDataBrokerRef = null;
110 private boolean connected = false;
112 public NetconfNodeManagerCallback(final String nodeId,
113 final String topologyId,
114 final ActorSystem actorSystem,
115 final NetconfTopology topologyDispatcher,
116 final RoleChangeStrategy roleChangeStrategy) {
117 this.nodeId = nodeId;
118 this.topologyId = topologyId;
119 this.actorSystem = actorSystem;
120 this.clusterExtension = Cluster.get(actorSystem);
121 this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
122 this.roleChangeStrategy = roleChangeStrategy;
124 final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
125 topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
127 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
128 if (throwable != null) {
129 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
133 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
134 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
136 }, actorSystem.dispatcher());
138 final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
139 nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
141 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
142 if (throwable != null) {
143 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
145 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
146 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
148 }, actorSystem.dispatcher());
153 @Override public Node getInitialState(@Nonnull final NodeId nodeId,
154 @Nonnull final Node configNode) {
155 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
157 final Node initialNode = new NodeBuilder()
159 .addAugmentation(NetconfNode.class,
160 new NetconfNodeBuilder()
161 .setHost(netconfNode.getHost())
162 .setPort(netconfNode.getPort())
163 .setConnectionStatus(ConnectionStatus.Connecting)
164 .setClusteredConnectionStatus(
165 new ClusteredConnectionStatusBuilder()
168 new NodeStatusBuilder()
169 .setNode(clusterExtension.selfAddress().toString())
170 .setStatus(Status.Unavailable)
176 if (currentOperationalNode == null) {
177 currentOperationalNode = initialNode;
183 @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
184 @Nullable final Node configNode) {
185 final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
187 return new NodeBuilder()
189 .addAugmentation(NetconfNode.class,
190 new NetconfNodeBuilder()
191 .setHost(netconfNode.getHost())
192 .setPort(netconfNode.getPort())
193 .setConnectionStatus(ConnectionStatus.UnableToConnect)
194 .setClusteredConnectionStatus(
195 new ClusteredConnectionStatusBuilder()
197 Collections.singletonList(
198 new NodeStatusBuilder()
199 .setNode(clusterExtension.selfAddress().toString())
200 .setStatus(Status.Failed)
207 @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
208 @Nonnull final Node configNode) {
209 cachedContext = TypedActor.context();
210 this.nodeId = nodeId.getValue();
211 this.currentConfig = configNode;
212 // set initial state before anything happens
213 this.currentOperationalNode = getInitialState(nodeId, configNode);
215 // connect magic, send config into the netconf pipeline through topo dispatcher
216 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
218 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
220 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
221 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
225 public void onFailure(Throwable t) {
226 LOG.error("Connection to device failed", t);
230 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
232 // transform future result into state that gets written into datastore
233 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
236 public Node apply(NetconfDeviceCapabilities input) {
238 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
239 .addAugmentation(NetconfNode.class,
240 new NetconfNodeBuilder()
241 .setConnectionStatus(ConnectionStatus.Connected)
242 .setClusteredConnectionStatus(
243 new ClusteredConnectionStatusBuilder()
245 Collections.singletonList(
246 new NodeStatusBuilder()
247 .setNode(clusterExtension.selfAddress().toString())
248 .setStatus(Status.Connected)
251 .setHost(netconfNode.getHost())
252 .setPort(netconfNode.getPort())
253 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
254 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
256 return currentOperationalNode;
263 public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
264 @Nonnull final Node configNode) {
265 // first disconnect this node
266 topologyDispatcher.unregisterMountPoint(nodeId);
267 registration.close();
268 topologyDispatcher.disconnectNode(nodeId);
270 // now reinit this connection with new settings
271 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
273 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
275 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
276 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
280 public void onFailure(Throwable t) {
281 LOG.error("Connection to device failed", t);
285 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
287 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
290 public Node apply(NetconfDeviceCapabilities input) {
292 return new NodeBuilder()
294 .addAugmentation(NetconfNode.class,
295 new NetconfNodeBuilder()
296 .setConnectionStatus(ConnectionStatus.Connected)
297 .setClusteredConnectionStatus(
298 new ClusteredConnectionStatusBuilder()
300 Collections.singletonList(
301 new NodeStatusBuilder()
302 .setNode(clusterExtension.selfAddress().toString())
303 .setStatus(Status.Connected)
306 .setHost(netconfNode.getHost())
307 .setPort(netconfNode.getPort())
308 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
309 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
316 @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
317 // cleanup and disconnect
318 topologyDispatcher.unregisterMountPoint(nodeId);
319 registration.close();
320 roleChangeStrategy.unregisterRoleCandidate();
321 return topologyDispatcher.disconnectNode(nodeId);
326 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
327 LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
328 return Futures.immediateFuture(currentOperationalNode);
332 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
333 topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
335 isMaster = roleChangeDTO.isOwner();
337 LOG.warn("Gained ownership of node - registering master mount point");
338 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
340 // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
341 // after we receive the message we can go ahead and register the mount point
342 if (connected && masterDataBrokerRef != null) {
343 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
345 LOG.debug("Mount point is ready, still waiting for master mount point");
351 public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
352 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
353 LOG.debug("onDeviceConnected received, registering role candidate");
355 roleChangeStrategy.registerRoleCandidate(nodeManager);
356 if (!isMaster && masterDataBrokerRef != null) {
357 // if we're not master but one is present already, we need to register mountpoint
358 LOG.warn("Device connected, master already present in topology, registering mount point");
359 topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
361 List<String> capabilityList = new ArrayList<>();
362 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
363 capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
364 final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
365 avCapabalitiesBuilder.setAvailableCapability(capabilityList);
367 final UnavailableCapabilities unavailableCapabilities =
368 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
369 .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
371 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
372 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
373 .addAugmentation(NetconfNode.class,
374 new NetconfNodeBuilder()
375 .setConnectionStatus(ConnectionStatus.Connected)
376 .setClusteredConnectionStatus(
377 new ClusteredConnectionStatusBuilder()
379 Collections.singletonList(
380 new NodeStatusBuilder()
381 .setNode(clusterExtension.selfAddress().toString())
382 .setStatus(Status.Connected)
385 .setHost(netconfNode.getHost())
386 .setPort(netconfNode.getPort())
387 .setAvailableCapabilities(avCapabalitiesBuilder.build())
388 .setUnavailableCapabilities(unavailableCapabilities)
391 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
395 public void onDeviceDisconnected() {
396 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
397 LOG.debug("onDeviceDisconnected received, unregistering role candidate");
400 // announce that master mount point is going down
401 for (final Member member : clusterExtension.state().getMembers()) {
402 actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
404 // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
406 // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
407 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
409 roleChangeStrategy.unregisterRoleCandidate();
411 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
412 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
413 .addAugmentation(NetconfNode.class,
414 new NetconfNodeBuilder()
415 .setConnectionStatus(ConnectionStatus.Connecting)
416 .setClusteredConnectionStatus(
417 new ClusteredConnectionStatusBuilder()
419 Collections.singletonList(
420 new NodeStatusBuilder()
421 .setNode(clusterExtension.selfAddress().toString())
422 .setStatus(Status.Unavailable)
425 .setHost(netconfNode.getHost())
426 .setPort(netconfNode.getPort())
428 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
432 public void onDeviceFailed(Throwable throwable) {
433 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
434 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
435 LOG.debug("onDeviceFailed received");
437 String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
439 roleChangeStrategy.unregisterRoleCandidate();
440 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
441 .addAugmentation(NetconfNode.class,
442 new NetconfNodeBuilder()
443 .setConnectionStatus(ConnectionStatus.UnableToConnect)
444 .setClusteredConnectionStatus(
445 new ClusteredConnectionStatusBuilder()
447 Collections.singletonList(
448 new NodeStatusBuilder()
449 .setNode(clusterExtension.selfAddress().toString())
450 .setStatus(Status.Failed)
453 .setConnectedMessage(reason)
455 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
459 public void onNotification(DOMNotification domNotification) {
464 public void close() {
469 public void onReceive(Object message, ActorRef actorRef) {
470 LOG.warn("Netconf node callback received message {}", message);
471 if (message instanceof AnnounceMasterMountPoint) {
472 masterDataBrokerRef = actorRef;
473 // candidate gets registered when mount point is already prepared so we can go ahead a register it
474 if (roleChangeStrategy.isCandidateRegistered()) {
475 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
477 LOG.warn("Announce master mount point msg received but mount point is not ready yet");
479 } else if (message instanceof AnnounceMasterMountPointDown) {
480 LOG.warn("Master mountpoint went down");
481 masterDataBrokerRef = null;
482 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));