2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.impl;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.cluster.Cluster;
16 import akka.dispatch.OnComplete;
17 import com.google.common.base.Function;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map.Entry;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
32 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
34 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
35 import org.opendaylight.netconf.topology.NetconfTopology;
36 import org.opendaylight.netconf.topology.NodeManagerCallback;
37 import org.opendaylight.netconf.topology.RoleChangeStrategy;
38 import org.opendaylight.netconf.topology.TopologyManager;
39 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
40 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
56 import org.opendaylight.yangtools.yang.common.QName;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.FiniteDuration;
63 public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
65 private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
67 public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
69 public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
70 return new UnavailableCapabilityBuilder()
71 .setCapability(input.getKey().toString())
72 .setFailureReason(input.getValue()).build();
75 public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
77 public String apply(QName qName) {
78 // intern string representation of a capability to avoid duplicates
79 return qName.toString().intern();
83 private static final String UNKNOWN_REASON = "Unknown reason";
85 private boolean isMaster = false;
86 private ClusteredNetconfTopology topologyDispatcher;
87 private final ActorSystem actorSystem;
88 private final Cluster clusterExtension;
90 private final RoleChangeStrategy roleChangeStrategy;
92 private String nodeId;
93 private String topologyId;
94 private TopologyManager topologyManager;
96 private Node currentConfig;
97 private Node currentOperationalNode;
99 private ConnectionStatusListenerRegistration registration = null;
101 public NetconfNodeManagerCallback(final String nodeId,
102 final String topologyId,
103 final ActorSystem actorSystem,
104 final NetconfTopology topologyDispatcher,
105 final RoleChangeStrategy roleChangeStrategy) {
106 this.nodeId = nodeId;
107 this.topologyId = topologyId;
108 this.actorSystem = actorSystem;
109 this.clusterExtension = Cluster.get(actorSystem);
110 this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
111 this.roleChangeStrategy = roleChangeStrategy;
113 final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
114 topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
116 public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
117 if (throwable != null) {
118 LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
122 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
123 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
125 }, actorSystem.dispatcher());
130 @Override public Node getInitialState(@Nonnull final NodeId nodeId,
131 @Nonnull final Node configNode) {
132 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
134 final Node initialNode = new NodeBuilder()
136 .addAugmentation(NetconfNode.class,
137 new NetconfNodeBuilder()
138 .setHost(netconfNode.getHost())
139 .setPort(netconfNode.getPort())
140 .setConnectionStatus(ConnectionStatus.Connecting)
141 .setClusteredConnectionStatus(
142 new ClusteredConnectionStatusBuilder()
145 new NodeStatusBuilder()
146 .setNode(clusterExtension.selfAddress().toString())
147 .setStatus(Status.Unavailable)
153 if (currentOperationalNode == null) {
154 currentOperationalNode = initialNode;
160 @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
161 @Nonnull final Node configNode) {
162 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
164 return new NodeBuilder()
166 .addAugmentation(NetconfNode.class,
167 new NetconfNodeBuilder()
168 .setHost(netconfNode.getHost())
169 .setPort(netconfNode.getPort())
170 .setConnectionStatus(ConnectionStatus.UnableToConnect)
171 .setClusteredConnectionStatus(
172 new ClusteredConnectionStatusBuilder()
174 Collections.singletonList(
175 new NodeStatusBuilder()
176 .setNode(clusterExtension.selfAddress().toString())
177 .setStatus(Status.Failed)
184 @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
185 @Nonnull final Node configNode) {
186 this.nodeId = nodeId.getValue();
187 this.currentConfig = configNode;
188 // set initial state before anything happens
189 this.currentOperationalNode = getInitialState(nodeId, configNode);
191 // connect magic, send config into the netconf pipeline through topo dispatcher
192 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
194 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
196 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
197 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
201 public void onFailure(Throwable t) {
202 LOG.error("Connection to device failed", t);
206 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
208 // transform future result into state that gets written into datastore
209 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
212 public Node apply(NetconfDeviceCapabilities input) {
214 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
215 .addAugmentation(NetconfNode.class,
216 new NetconfNodeBuilder()
217 .setConnectionStatus(ConnectionStatus.Connected)
218 .setClusteredConnectionStatus(
219 new ClusteredConnectionStatusBuilder()
221 Collections.singletonList(
222 new NodeStatusBuilder()
223 .setNode(clusterExtension.selfAddress().toString())
224 .setStatus(Status.Connected)
227 .setHost(netconfNode.getHost())
228 .setPort(netconfNode.getPort())
229 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
230 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
232 return currentOperationalNode;
239 public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
240 @Nonnull final Node configNode) {
241 // first disconnect this node
242 topologyDispatcher.unregisterMountPoint(nodeId);
243 registration.close();
244 topologyDispatcher.disconnectNode(nodeId);
246 // now reinit this connection with new settings
247 final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
249 Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
251 public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
252 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
256 public void onFailure(Throwable t) {
257 LOG.error("Connection to device failed", t);
261 final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
263 return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
266 public Node apply(NetconfDeviceCapabilities input) {
268 return new NodeBuilder()
270 .addAugmentation(NetconfNode.class,
271 new NetconfNodeBuilder()
272 .setConnectionStatus(ConnectionStatus.Connected)
273 .setClusteredConnectionStatus(
274 new ClusteredConnectionStatusBuilder()
276 Collections.singletonList(
277 new NodeStatusBuilder()
278 .setNode(clusterExtension.selfAddress().toString())
279 .setStatus(Status.Connected)
282 .setHost(netconfNode.getHost())
283 .setPort(netconfNode.getPort())
284 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
285 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
292 @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
293 // cleanup and disconnect
294 topologyDispatcher.unregisterMountPoint(nodeId);
295 registration.close();
296 roleChangeStrategy.unregisterRoleCandidate();
297 return topologyDispatcher.disconnectNode(nodeId);
302 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
303 LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
304 return Futures.immediateFuture(currentOperationalNode);
308 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
309 if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
312 isMaster = roleChangeDTO.isOwner();
313 //TODO instead of registering mount point, init remote schema repo when its done
315 // unregister old mountPoint if ownership changed, register a new one
316 topologyDispatcher.registerMountPoint(new NodeId(nodeId));
318 topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
323 public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
324 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
325 LOG.debug("onDeviceConnected received, registering role candidate");
326 roleChangeStrategy.registerRoleCandidate(this);
327 List<String> capabilityList = new ArrayList<>();
328 capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
329 capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
330 final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
331 avCapabalitiesBuilder.setAvailableCapability(capabilityList);
333 final UnavailableCapabilities unavailableCapabilities =
334 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
335 .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
337 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
338 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
339 .addAugmentation(NetconfNode.class,
340 new NetconfNodeBuilder()
341 .setConnectionStatus(ConnectionStatus.Connected)
342 .setClusteredConnectionStatus(
343 new ClusteredConnectionStatusBuilder()
345 Collections.singletonList(
346 new NodeStatusBuilder()
347 .setNode(clusterExtension.selfAddress().toString())
348 .setStatus(Status.Connected)
351 .setHost(netconfNode.getHost())
352 .setPort(netconfNode.getPort())
353 .setAvailableCapabilities(avCapabalitiesBuilder.build())
354 .setUnavailableCapabilities(unavailableCapabilities)
357 // TODO need to implement forwarding of this msg to master
358 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
362 public void onDeviceDisconnected() {
363 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
364 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
365 LOG.debug("onDeviceDisconnected received, unregistering role candidate");
366 topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
367 roleChangeStrategy.unregisterRoleCandidate();
368 final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
369 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
370 .addAugmentation(NetconfNode.class,
371 new NetconfNodeBuilder()
372 .setConnectionStatus(ConnectionStatus.Connecting)
373 .setClusteredConnectionStatus(
374 new ClusteredConnectionStatusBuilder()
376 Collections.singletonList(
377 new NodeStatusBuilder()
378 .setNode(clusterExtension.selfAddress().toString())
379 .setStatus(Status.Unavailable)
382 .setHost(netconfNode.getHost())
383 .setPort(netconfNode.getPort())
385 // TODO need to implement forwarding of this msg to master
386 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
390 public void onDeviceFailed(Throwable throwable) {
391 // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
392 // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
393 LOG.debug("onDeviceFailed received");
394 String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
396 roleChangeStrategy.unregisterRoleCandidate();
397 currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
398 .addAugmentation(NetconfNode.class,
399 new NetconfNodeBuilder()
400 .setConnectionStatus(ConnectionStatus.UnableToConnect)
401 .setClusteredConnectionStatus(
402 new ClusteredConnectionStatusBuilder()
404 Collections.singletonList(
405 new NodeStatusBuilder()
406 .setNode(clusterExtension.selfAddress().toString())
407 .setStatus(Status.Failed)
410 .setConnectedMessage(reason)
412 topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
417 public void onNotification(DOMNotification domNotification) {
422 public void close() {
427 public void onReceive(Object o, ActorRef actorRef) {