5f0eb2e179ffe910b1947375984d2e400833aae4
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.cluster.Member;
18 import akka.dispatch.OnComplete;
19 import com.google.common.base.Function;
20 import com.google.common.collect.FluentIterable;
21 import com.google.common.collect.Lists;
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.Nonnull;
31 import javax.annotation.Nullable;
32 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
33 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
34 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
35 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
36 import org.opendaylight.netconf.topology.NetconfTopology;
37 import org.opendaylight.netconf.topology.NodeManager;
38 import org.opendaylight.netconf.topology.NodeManagerCallback;
39 import org.opendaylight.netconf.topology.RoleChangeStrategy;
40 import org.opendaylight.netconf.topology.TopologyManager;
41 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
42 import org.opendaylight.netconf.topology.util.BaseNodeManager;
43 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
44 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
45 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
61 import org.opendaylight.yangtools.yang.common.QName;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Future;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class NetconfNodeManagerCallback implements NodeManagerCallback{
69
70     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
71
72     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
73         @Override
74         public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
75             return new UnavailableCapabilityBuilder()
76                     .setCapability(input.getKey().toString())
77                     .setFailureReason(input.getValue()).build();
78         }
79     };
80     public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
81         @Override
82         public String apply(QName qName) {
83             // intern string representation of a capability to avoid duplicates
84             return qName.toString().intern();
85         }
86     };
87
88     private static final String UNKNOWN_REASON = "Unknown reason";
89
90     private boolean isMaster = false;
91     private ClusteredNetconfTopology topologyDispatcher;
92     private final ActorSystem actorSystem;
93     private final Cluster clusterExtension;
94
95     private final RoleChangeStrategy roleChangeStrategy;
96
97     private String nodeId;
98     private String topologyId;
99     private TopologyManager topologyManager;
100     private NodeManager nodeManager;
101     // cached context so that we can use it in callbacks from topology
102     private ActorContext cachedContext;
103
104     private Node currentConfig;
105     private Node currentOperationalNode;
106
107     private ConnectionStatusListenerRegistration registration = null;
108
109     private ActorRef masterDataBrokerRef = null;
110     private boolean connected = false;
111
112     public NetconfNodeManagerCallback(final String nodeId,
113                                       final String topologyId,
114                                       final ActorSystem actorSystem,
115                                       final NetconfTopology topologyDispatcher,
116                                       final RoleChangeStrategy roleChangeStrategy) {
117         this.nodeId = nodeId;
118         this.topologyId = topologyId;
119         this.actorSystem = actorSystem;
120         this.clusterExtension = Cluster.get(actorSystem);
121         this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
122         this.roleChangeStrategy = roleChangeStrategy;
123
124         final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
125         topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
126             @Override
127             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
128                 if (throwable != null) {
129                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
130
131                 }
132
133                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
134                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
135             }
136         }, actorSystem.dispatcher());
137
138         final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
139         nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
140             @Override
141             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
142                 if (throwable != null) {
143                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
144                 }
145                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
146                 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
147             }
148         }, actorSystem.dispatcher());
149     }
150
151
152     @Nonnull
153     @Override public Node getInitialState(@Nonnull final NodeId nodeId,
154                                           @Nonnull final Node configNode) {
155         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
156
157         final Node initialNode = new NodeBuilder()
158                 .setNodeId(nodeId)
159                 .addAugmentation(NetconfNode.class,
160                         new NetconfNodeBuilder()
161                                 .setHost(netconfNode.getHost())
162                                 .setPort(netconfNode.getPort())
163                                 .setConnectionStatus(ConnectionStatus.Connecting)
164                                 .setClusteredConnectionStatus(
165                                         new ClusteredConnectionStatusBuilder()
166                                                 .setNodeStatus(
167                                                         Lists.newArrayList(
168                                                                 new NodeStatusBuilder()
169                                                                         .setNode(clusterExtension.selfAddress().toString())
170                                                                         .setStatus(Status.Unavailable)
171                                                                         .build()))
172                                                 .build())
173                                 .build())
174                 .build();
175
176         if (currentOperationalNode == null) {
177             currentOperationalNode = initialNode;
178         }
179
180         return initialNode;
181     }
182
183     @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
184                                                   @Nonnull final Node configNode) {
185         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
186
187         return new NodeBuilder()
188                 .setNodeId(nodeId)
189                 .addAugmentation(NetconfNode.class,
190                         new NetconfNodeBuilder()
191                                 .setHost(netconfNode.getHost())
192                                 .setPort(netconfNode.getPort())
193                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
194                                 .setClusteredConnectionStatus(
195                                         new ClusteredConnectionStatusBuilder()
196                                                 .setNodeStatus(
197                                                         Collections.singletonList(
198                                                                 new NodeStatusBuilder()
199                                                                         .setNode(clusterExtension.selfAddress().toString())
200                                                                         .setStatus(Status.Failed)
201                                                                         .build()))
202                                                 .build())
203                                 .build())
204                 .build();
205     }
206
207     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
208                                                                    @Nonnull final Node configNode) {
209         cachedContext = TypedActor.context();
210         this.nodeId = nodeId.getValue();
211         this.currentConfig = configNode;
212         // set initial state before anything happens
213         this.currentOperationalNode = getInitialState(nodeId, configNode);
214
215         // connect magic, send config into the netconf pipeline through topo dispatcher
216         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
217
218         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
219             @Override
220             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
221                 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
222             }
223
224             @Override
225             public void onFailure(Throwable t) {
226                 LOG.error("Connection to device failed", t);
227             }
228         });
229
230         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
231
232         // transform future result into state that gets written into datastore
233         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
234             @Nullable
235             @Override
236             public Node apply(NetconfDeviceCapabilities input) {
237                 // build state data
238                 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
239                         .addAugmentation(NetconfNode.class,
240                                 new NetconfNodeBuilder()
241                                         .setConnectionStatus(ConnectionStatus.Connected)
242                                         .setClusteredConnectionStatus(
243                                                 new ClusteredConnectionStatusBuilder()
244                                                         .setNodeStatus(
245                                                                 Collections.singletonList(
246                                                                         new NodeStatusBuilder()
247                                                                                 .setNode(clusterExtension.selfAddress().toString())
248                                                                                 .setStatus(Status.Connected)
249                                                                                 .build()))
250                                                         .build())
251                                         .setHost(netconfNode.getHost())
252                                         .setPort(netconfNode.getPort())
253                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
254                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
255                                         .build()).build();
256                 return currentOperationalNode;
257             }
258         });
259     }
260
261     @Nonnull
262     @Override
263     public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
264                                                 @Nonnull final Node configNode) {
265         // first disconnect this node
266         topologyDispatcher.unregisterMountPoint(nodeId);
267         registration.close();
268         topologyDispatcher.disconnectNode(nodeId);
269
270         // now reinit this connection with new settings
271         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
272
273         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
274             @Override
275             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
276                 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
277             }
278
279             @Override
280             public void onFailure(Throwable t) {
281                 LOG.error("Connection to device failed", t);
282             }
283         });
284
285         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
286
287         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
288             @Nullable
289             @Override
290             public Node apply(NetconfDeviceCapabilities input) {
291                 // build state data
292                 return new NodeBuilder()
293                         .setNodeId(nodeId)
294                         .addAugmentation(NetconfNode.class,
295                                 new NetconfNodeBuilder()
296                                         .setConnectionStatus(ConnectionStatus.Connected)
297                                         .setClusteredConnectionStatus(
298                                                 new ClusteredConnectionStatusBuilder()
299                                                         .setNodeStatus(
300                                                                 Collections.singletonList(
301                                                                         new NodeStatusBuilder()
302                                                                                 .setNode(clusterExtension.selfAddress().toString())
303                                                                                 .setStatus(Status.Connected)
304                                                                                 .build()))
305                                                         .build())
306                                         .setHost(netconfNode.getHost())
307                                         .setPort(netconfNode.getPort())
308                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
309                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
310                                         .build())
311                         .build();
312             }
313         });
314     }
315
316     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
317         // cleanup and disconnect
318         topologyDispatcher.unregisterMountPoint(nodeId);
319         registration.close();
320         roleChangeStrategy.unregisterRoleCandidate();
321         return topologyDispatcher.disconnectNode(nodeId);
322     }
323
324     @Nonnull
325     @Override
326     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
327         LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
328         return Futures.immediateFuture(currentOperationalNode);
329     }
330
331     @Override
332     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
333         topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
334
335         isMaster = roleChangeDTO.isOwner();
336         if (isMaster) {
337             LOG.warn("Gained ownership of node - registering master mount point");
338             topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
339         } else {
340             // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
341             // after we receive the message we can go ahead and register the mount point
342             if (connected && masterDataBrokerRef != null) {
343                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
344             } else {
345                 LOG.debug("Mount point is ready, still waiting for master mount point");
346             }
347         }
348     }
349
350     @Override
351     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
352         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
353         LOG.debug("onDeviceConnected received, registering role candidate");
354         connected = true;
355         roleChangeStrategy.registerRoleCandidate(nodeManager);
356         if (!isMaster && masterDataBrokerRef != null) {
357             // if we're not master but one is present already, we need to register mountpoint
358             LOG.warn("Device connected, master already present in topology, registering mount point");
359             topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
360         }
361         List<String> capabilityList = new ArrayList<>();
362         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
363         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
364         final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
365         avCapabalitiesBuilder.setAvailableCapability(capabilityList);
366
367         final UnavailableCapabilities unavailableCapabilities =
368                 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
369                         .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
370
371         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
372         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
373                 .addAugmentation(NetconfNode.class,
374                         new NetconfNodeBuilder()
375                                 .setConnectionStatus(ConnectionStatus.Connected)
376                                 .setClusteredConnectionStatus(
377                                         new ClusteredConnectionStatusBuilder()
378                                                 .setNodeStatus(
379                                                         Collections.singletonList(
380                                                                 new NodeStatusBuilder()
381                                                                         .setNode(clusterExtension.selfAddress().toString())
382                                                                         .setStatus(Status.Connected)
383                                                                         .build()))
384                                                 .build())
385                                 .setHost(netconfNode.getHost())
386                                 .setPort(netconfNode.getPort())
387                                 .setAvailableCapabilities(avCapabalitiesBuilder.build())
388                                 .setUnavailableCapabilities(unavailableCapabilities)
389                                 .build())
390                 .build();
391         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
392     }
393
394     @Override
395     public void onDeviceDisconnected() {
396         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
397         LOG.debug("onDeviceDisconnected received, unregistering role candidate");
398         connected = false;
399         if (isMaster) {
400             // announce that master mount point is going down
401             for (final Member member : clusterExtension.state().getMembers()) {
402                 actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
403             }
404             // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
405             isMaster = false;
406             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
407             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
408         }
409         roleChangeStrategy.unregisterRoleCandidate();
410
411         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
412         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
413                 .addAugmentation(NetconfNode.class,
414                         new NetconfNodeBuilder()
415                                 .setConnectionStatus(ConnectionStatus.Connecting)
416                                 .setClusteredConnectionStatus(
417                                         new ClusteredConnectionStatusBuilder()
418                                                 .setNodeStatus(
419                                                         Collections.singletonList(
420                                                                 new NodeStatusBuilder()
421                                                                         .setNode(clusterExtension.selfAddress().toString())
422                                                                         .setStatus(Status.Unavailable)
423                                                                         .build()))
424                                                 .build())
425                                 .setHost(netconfNode.getHost())
426                                 .setPort(netconfNode.getPort())
427                                 .build()).build();
428         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
429     }
430
431     @Override
432     public void onDeviceFailed(Throwable throwable) {
433         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
434         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
435         LOG.debug("onDeviceFailed received");
436         connected = false;
437         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
438
439         roleChangeStrategy.unregisterRoleCandidate();
440         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
441                 .addAugmentation(NetconfNode.class,
442                         new NetconfNodeBuilder()
443                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
444                                 .setClusteredConnectionStatus(
445                                         new ClusteredConnectionStatusBuilder()
446                                                 .setNodeStatus(
447                                                         Collections.singletonList(
448                                                                 new NodeStatusBuilder()
449                                                                         .setNode(clusterExtension.selfAddress().toString())
450                                                                         .setStatus(Status.Failed)
451                                                                         .build()))
452                                                 .build())
453                                 .setConnectedMessage(reason)
454                                 .build()).build();
455         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
456     }
457
458     @Override
459     public void onNotification(DOMNotification domNotification) {
460         //NOOP
461     }
462
463     @Override
464     public void close() {
465         //NOOP
466     }
467
468     @Override
469     public void onReceive(Object message, ActorRef actorRef) {
470         LOG.warn("Netconf node callback received message {}", message);
471         if (message instanceof AnnounceMasterMountPoint) {
472             masterDataBrokerRef = actorRef;
473             // candidate gets registered when mount point is already prepared so we can go ahead a register it
474             if (roleChangeStrategy.isCandidateRegistered()) {
475                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
476             } else {
477                 LOG.warn("Announce master mount point msg received but mount point is not ready yet");
478             }
479         } else if (message instanceof AnnounceMasterMountPointDown) {
480             LOG.warn("Master mountpoint went down");
481             masterDataBrokerRef = null;
482             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
483         }
484     }
485 }