2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
34 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
35 import org.opendaylight.netconf.topology.NetconfTopology;
36 import org.opendaylight.netconf.topology.NodeManager;
37 import org.opendaylight.netconf.topology.NodeManagerCallback;
38 import org.opendaylight.netconf.topology.RoleChangeStrategy;
39 import org.opendaylight.netconf.topology.TopologyManager;
40 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
41 import org.opendaylight.netconf.topology.util.BaseNodeManager;
42 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
43 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
44 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
60 import org.opendaylight.yangtools.yang.common.QName;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.Future;
65 import scala.concurrent.duration.FiniteDuration;
67 public class NetconfNodeManagerCallback implements NodeManagerCallback{
69 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
71 public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
73 public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
74 return new UnavailableCapabilityBuilder()
75 .setCapability(input.getKey().toString())
76 .setFailureReason(input.getValue()).build();
79 public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
81 public String apply(QName qName) {
82 // intern string representation of a capability to avoid duplicates
83 return qName.toString().intern();
87 private static final String UNKNOWN_REASON = "Unknown reason";
89 private boolean isMaster = false;
90 private ClusteredNetconfTopology topologyDispatcher;
91 private final ActorSystem actorSystem;
92 private final Cluster clusterExtension;
94 private final RoleChangeStrategy roleChangeStrategy;
96 private String nodeId;
97 private String topologyId;
98 private TopologyManager topologyManager;
99 private NodeManager nodeManager;
100 // cached context so that we can use it in callbacks from topology
101 private ActorContext cachedContext;
103 private Node currentConfig;
104 private Node currentOperationalNode;
106 private ConnectionStatusListenerRegistration registration = null;
108 private ActorRef masterDataBrokerRef = null;
109 private boolean connected = false;
111 public NetconfNodeManagerCallback(final String nodeId,
112 final String topologyId,
113 final ActorSystem actorSystem,
114 final NetconfTopology topologyDispatcher,
115 final RoleChangeStrategy roleChangeStrategy) {
116 this.nodeId = nodeId;
117 this.topologyId = topologyId;
118 this.actorSystem = actorSystem;
119 this.clusterExtension = Cluster.get(actorSystem);
120 this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
121 this.roleChangeStrategy = roleChangeStrategy;
123 final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
124 topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
126 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
127 if (throwable != null) {
128 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
132 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
133 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
135 }, actorSystem.dispatcher());
137 final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
138 nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
140 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
141 if (throwable != null) {
142 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
144 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
145 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
147 }, actorSystem.dispatcher());
152 @Override public Node getInitialState(@Nonnull final NodeId nodeId,
153 @Nonnull final Node configNode) {
154 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
156 final Node initialNode = new NodeBuilder()
158 .addAugmentation(NetconfNode.class,
159 new NetconfNodeBuilder()
160 .setHost(netconfNode.getHost())
161 .setPort(netconfNode.getPort())
162 .setConnectionStatus(ConnectionStatus.Connecting)
163 .setClusteredConnectionStatus(
164 new ClusteredConnectionStatusBuilder()
167 new NodeStatusBuilder()
168 .setNode(clusterExtension.selfAddress().toString())
169 .setStatus(Status.Unavailable)
175 if (currentOperationalNode == null) {
176 currentOperationalNode = initialNode;
182 @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
183 @Nullable final Node configNode) {
184 final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
186 final Node failedNode = new NodeBuilder()
188 .addAugmentation(NetconfNode.class,
189 new NetconfNodeBuilder()
190 .setHost(netconfNode.getHost())
191 .setPort(netconfNode.getPort())
192 .setConnectionStatus(ConnectionStatus.UnableToConnect)
193 .setClusteredConnectionStatus(
194 new ClusteredConnectionStatusBuilder()
196 Collections.singletonList(
197 new NodeStatusBuilder()
198 .setNode(clusterExtension.selfAddress().toString())
199 .setStatus(Status.Failed)
205 if (currentOperationalNode == null) {
206 currentOperationalNode = failedNode;
212 @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
213 @Nonnull final Node configNode) {
214 cachedContext = TypedActor.context();
215 this.nodeId = nodeId.getValue();
216 this.currentConfig = configNode;
217 // set initial state before anything happens
218 this.currentOperationalNode = getInitialState(nodeId, configNode);
220 // connect magic, send config into the netconf pipeline through topo dispatcher
221 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
223 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
225 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
226 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
230 public void onFailure(Throwable t) {
231 LOG.error("Connection to device failed", t);
235 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
237 // transform future result into state that gets written into datastore
238 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
241 public Node apply(NetconfDeviceCapabilities input) {
243 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
244 .addAugmentation(NetconfNode.class,
245 new NetconfNodeBuilder()
246 .setConnectionStatus(ConnectionStatus.Connected)
247 .setClusteredConnectionStatus(
248 new ClusteredConnectionStatusBuilder()
250 Collections.singletonList(
251 new NodeStatusBuilder()
252 .setNode(clusterExtension.selfAddress().toString())
253 .setStatus(Status.Connected)
256 .setHost(netconfNode.getHost())
257 .setPort(netconfNode.getPort())
258 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
259 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
261 return currentOperationalNode;
268 public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
269 @Nonnull final Node configNode) {
270 // first disconnect this node
271 topologyDispatcher.unregisterMountPoint(nodeId);
272 if (registration != null) {
273 registration.close();
275 topologyDispatcher.disconnectNode(nodeId);
277 // now reinit this connection with new settings
278 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
280 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
282 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
283 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
287 public void onFailure(Throwable t) {
288 LOG.error("Connection to device failed", t);
292 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
294 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
297 public Node apply(NetconfDeviceCapabilities input) {
299 return new NodeBuilder()
301 .addAugmentation(NetconfNode.class,
302 new NetconfNodeBuilder()
303 .setConnectionStatus(ConnectionStatus.Connected)
304 .setClusteredConnectionStatus(
305 new ClusteredConnectionStatusBuilder()
307 Collections.singletonList(
308 new NodeStatusBuilder()
309 .setNode(clusterExtension.selfAddress().toString())
310 .setStatus(Status.Connected)
313 .setHost(netconfNode.getHost())
314 .setPort(netconfNode.getPort())
315 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
316 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
323 @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
324 // cleanup and disconnect
325 topologyDispatcher.unregisterMountPoint(nodeId);
326 if (registration != null) {
327 registration.close();
329 roleChangeStrategy.unregisterRoleCandidate();
330 return topologyDispatcher.disconnectNode(nodeId);
335 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
336 LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
337 return Futures.immediateFuture(currentOperationalNode);
341 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
342 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
344 isMaster = roleChangeDTO.isOwner();
346 LOG.warn("Gained ownership of node - registering master mount point");
347 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
349 // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
350 // after we receive the message we can go ahead and register the mount point
351 if (connected && masterDataBrokerRef != null) {
352 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
354 LOG.debug("Mount point is ready, still waiting for master mount point");
360 public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
361 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
362 LOG.debug("onDeviceConnected received, registering role candidate");
364 roleChangeStrategy.registerRoleCandidate(nodeManager);
365 if (!isMaster && masterDataBrokerRef != null) {
366 // if we're not master but one is present already, we need to register mountpoint
367 LOG.warn("Device connected, master already present in topology, registering mount point");
368 topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
370 List<String> capabilityList = new ArrayList<>();
371 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
372 capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
373 final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
374 avCapabalitiesBuilder.setAvailableCapability(capabilityList);
376 final UnavailableCapabilities unavailableCapabilities =
377 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
378 .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
380 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
381 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
382 .addAugmentation(NetconfNode.class,
383 new NetconfNodeBuilder()
384 .setConnectionStatus(ConnectionStatus.Connected)
385 .setClusteredConnectionStatus(
386 new ClusteredConnectionStatusBuilder()
388 Collections.singletonList(
389 new NodeStatusBuilder()
390 .setNode(clusterExtension.selfAddress().toString())
391 .setStatus(Status.Connected)
394 .setHost(netconfNode.getHost())
395 .setPort(netconfNode.getPort())
396 .setAvailableCapabilities(avCapabalitiesBuilder.build())
397 .setUnavailableCapabilities(unavailableCapabilities)
400 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
404 public void onDeviceDisconnected() {
405 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
406 LOG.debug("onDeviceDisconnected received, unregistering role candidate");
409 // announce that master mount point is going down
410 // for (final Member member : clusterExtension.state().getMembers()) {
411 // actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
413 // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
415 // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
416 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
418 roleChangeStrategy.unregisterRoleCandidate();
420 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
421 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
422 .addAugmentation(NetconfNode.class,
423 new NetconfNodeBuilder()
424 .setConnectionStatus(ConnectionStatus.Connecting)
425 .setClusteredConnectionStatus(
426 new ClusteredConnectionStatusBuilder()
428 Collections.singletonList(
429 new NodeStatusBuilder()
430 .setNode(clusterExtension.selfAddress().toString())
431 .setStatus(Status.Unavailable)
434 .setHost(netconfNode.getHost())
435 .setPort(netconfNode.getPort())
437 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
441 public void onDeviceFailed(Throwable throwable) {
442 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
443 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
444 LOG.debug("onDeviceFailed received");
446 String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
448 roleChangeStrategy.unregisterRoleCandidate();
449 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
450 .addAugmentation(NetconfNode.class,
451 new NetconfNodeBuilder()
452 .setConnectionStatus(ConnectionStatus.UnableToConnect)
453 .setClusteredConnectionStatus(
454 new ClusteredConnectionStatusBuilder()
456 Collections.singletonList(
457 new NodeStatusBuilder()
458 .setNode(clusterExtension.selfAddress().toString())
459 .setStatus(Status.Failed)
462 .setConnectedMessage(reason)
464 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
468 public void onNotification(DOMNotification domNotification) {
473 public void close() {
478 public void onReceive(Object message, ActorRef actorRef) {
479 LOG.warn("Netconf node callback received message {}", message);
480 if (message instanceof AnnounceMasterMountPoint) {
481 masterDataBrokerRef = actorRef;
482 // candidate gets registered when mount point is already prepared so we can go ahead a register it
483 if (roleChangeStrategy.isCandidateRegistered()) {
484 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
486 LOG.warn("Announce master mount point msg received but mount point is not ready yet");
488 } else if (message instanceof AnnounceMasterMountPointDown) {
489 LOG.warn("Master mountpoint went down");
490 masterDataBrokerRef = null;
491 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));