a17650a6fb0403a30eeb33f445dd671682c548e2
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.cluster.Cluster;
16 import akka.dispatch.OnComplete;
17 import com.google.common.base.Function;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
26 import java.util.Map.Entry;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.Nonnull;
29 import javax.annotation.Nullable;
30 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
32 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
34 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
35 import org.opendaylight.netconf.topology.NetconfTopology;
36 import org.opendaylight.netconf.topology.NodeManagerCallback;
37 import org.opendaylight.netconf.topology.RoleChangeStrategy;
38 import org.opendaylight.netconf.topology.TopologyManager;
39 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
40 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
53 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
54 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
56 import org.opendaylight.yangtools.yang.common.QName;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.FiniteDuration;
62
63 public class NetconfNodeManagerCallback implements NodeManagerCallback, RemoteDeviceHandler<NetconfSessionPreferences>{
64
65     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
66
67     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
68         @Override
69         public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
70             return new UnavailableCapabilityBuilder()
71                     .setCapability(input.getKey().toString())
72                     .setFailureReason(input.getValue()).build();
73         }
74     };
75     public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
76         @Override
77         public String apply(QName qName) {
78             // intern string representation of a capability to avoid duplicates
79             return qName.toString().intern();
80         }
81     };
82
83     private static final String UNKNOWN_REASON = "Unknown reason";
84
85     private boolean isMaster = false;
86     private ClusteredNetconfTopology topologyDispatcher;
87     private final ActorSystem actorSystem;
88     private final Cluster clusterExtension;
89
90     private final RoleChangeStrategy roleChangeStrategy;
91
92     private String nodeId;
93     private String topologyId;
94     private TopologyManager topologyManager;
95
96     private Node currentConfig;
97     private Node currentOperationalNode;
98
99     private ConnectionStatusListenerRegistration registration = null;
100
101     public NetconfNodeManagerCallback(final String nodeId,
102                                       final String topologyId,
103                                       final ActorSystem actorSystem,
104                                       final NetconfTopology topologyDispatcher,
105                                       final RoleChangeStrategy roleChangeStrategy) {
106         this.nodeId = nodeId;
107         this.topologyId = topologyId;
108         this.actorSystem = actorSystem;
109         this.clusterExtension = Cluster.get(actorSystem);
110         this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
111         this.roleChangeStrategy = roleChangeStrategy;
112
113         final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
114         topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
115             @Override
116             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
117                 if (throwable != null) {
118                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
119
120                 }
121
122                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
123                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
124             }
125         }, actorSystem.dispatcher());
126     }
127
128
129     @Nonnull
130     @Override public Node getInitialState(@Nonnull final NodeId nodeId,
131                                           @Nonnull final Node configNode) {
132         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
133
134         final Node initialNode = new NodeBuilder()
135                 .setNodeId(nodeId)
136                 .addAugmentation(NetconfNode.class,
137                         new NetconfNodeBuilder()
138                                 .setHost(netconfNode.getHost())
139                                 .setPort(netconfNode.getPort())
140                                 .setConnectionStatus(ConnectionStatus.Connecting)
141                                 .setClusteredConnectionStatus(
142                                         new ClusteredConnectionStatusBuilder()
143                                                 .setNodeStatus(
144                                                         Lists.newArrayList(
145                                                                 new NodeStatusBuilder()
146                                                                         .setNode(clusterExtension.selfAddress().toString())
147                                                                         .setStatus(Status.Unavailable)
148                                                                         .build()))
149                                                 .build())
150                                 .build())
151                 .build();
152
153         if (currentOperationalNode == null) {
154             currentOperationalNode = initialNode;
155         }
156
157         return initialNode;
158     }
159
160     @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
161                                                   @Nonnull final Node configNode) {
162         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
163
164         return new NodeBuilder()
165                 .setNodeId(nodeId)
166                 .addAugmentation(NetconfNode.class,
167                         new NetconfNodeBuilder()
168                                 .setHost(netconfNode.getHost())
169                                 .setPort(netconfNode.getPort())
170                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
171                                 .setClusteredConnectionStatus(
172                                         new ClusteredConnectionStatusBuilder()
173                                                 .setNodeStatus(
174                                                         Collections.singletonList(
175                                                                 new NodeStatusBuilder()
176                                                                         .setNode(clusterExtension.selfAddress().toString())
177                                                                         .setStatus(Status.Failed)
178                                                                         .build()))
179                                                 .build())
180                                 .build())
181                 .build();
182     }
183
184     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
185                                                                    @Nonnull final Node configNode) {
186         this.nodeId = nodeId.getValue();
187         this.currentConfig = configNode;
188         // set initial state before anything happens
189         this.currentOperationalNode = getInitialState(nodeId, configNode);
190
191         // connect magic, send config into the netconf pipeline through topo dispatcher
192         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
193
194         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
195             @Override
196             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
197                 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
198             }
199
200             @Override
201             public void onFailure(Throwable t) {
202                 LOG.error("Connection to device failed", t);
203             }
204         });
205
206         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
207
208         // transform future result into state that gets written into datastore
209         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
210             @Nullable
211             @Override
212             public Node apply(NetconfDeviceCapabilities input) {
213                 // build state data
214                 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
215                         .addAugmentation(NetconfNode.class,
216                                 new NetconfNodeBuilder()
217                                         .setConnectionStatus(ConnectionStatus.Connected)
218                                         .setClusteredConnectionStatus(
219                                                 new ClusteredConnectionStatusBuilder()
220                                                         .setNodeStatus(
221                                                                 Collections.singletonList(
222                                                                         new NodeStatusBuilder()
223                                                                                 .setNode(clusterExtension.selfAddress().toString())
224                                                                                 .setStatus(Status.Connected)
225                                                                                 .build()))
226                                                         .build())
227                                         .setHost(netconfNode.getHost())
228                                         .setPort(netconfNode.getPort())
229                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
230                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
231                                         .build()).build();
232                 return currentOperationalNode;
233             }
234         });
235     }
236
237     @Nonnull
238     @Override
239     public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
240                                                 @Nonnull final Node configNode) {
241         // first disconnect this node
242         topologyDispatcher.unregisterMountPoint(nodeId);
243         registration.close();
244         topologyDispatcher.disconnectNode(nodeId);
245
246         // now reinit this connection with new settings
247         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
248
249         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
250             @Override
251             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
252                 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
253             }
254
255             @Override
256             public void onFailure(Throwable t) {
257                 LOG.error("Connection to device failed", t);
258             }
259         });
260
261         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
262
263         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
264             @Nullable
265             @Override
266             public Node apply(NetconfDeviceCapabilities input) {
267                 // build state data
268                 return new NodeBuilder()
269                         .setNodeId(nodeId)
270                         .addAugmentation(NetconfNode.class,
271                                 new NetconfNodeBuilder()
272                                         .setConnectionStatus(ConnectionStatus.Connected)
273                                         .setClusteredConnectionStatus(
274                                                 new ClusteredConnectionStatusBuilder()
275                                                         .setNodeStatus(
276                                                                 Collections.singletonList(
277                                                                         new NodeStatusBuilder()
278                                                                                 .setNode(clusterExtension.selfAddress().toString())
279                                                                                 .setStatus(Status.Connected)
280                                                                                 .build()))
281                                                         .build())
282                                         .setHost(netconfNode.getHost())
283                                         .setPort(netconfNode.getPort())
284                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
285                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
286                                         .build())
287                                 .build();
288             }
289         });
290     }
291
292     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
293         // cleanup and disconnect
294         topologyDispatcher.unregisterMountPoint(nodeId);
295         registration.close();
296         roleChangeStrategy.unregisterRoleCandidate();
297         return topologyDispatcher.disconnectNode(nodeId);
298     }
299
300     @Nonnull
301     @Override
302     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
303         LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
304         return Futures.immediateFuture(currentOperationalNode);
305     }
306
307     @Override
308     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
309         if (roleChangeDTO.isOwner() && roleChangeDTO.wasOwner()) {
310             return;
311         }
312         isMaster = roleChangeDTO.isOwner();
313         //TODO instead of registering mount point, init remote schema repo when its done
314         if (isMaster) {
315             // unregister old mountPoint if ownership changed, register a new one
316             topologyDispatcher.registerMountPoint(new NodeId(nodeId));
317         } else {
318             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
319         }
320     }
321
322     @Override
323     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
324         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
325         LOG.debug("onDeviceConnected received, registering role candidate");
326         roleChangeStrategy.registerRoleCandidate(this);
327         List<String> capabilityList = new ArrayList<>();
328         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
329         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
330         final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
331         avCapabalitiesBuilder.setAvailableCapability(capabilityList);
332
333         final UnavailableCapabilities unavailableCapabilities =
334                 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
335                         .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
336
337         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
338         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
339                 .addAugmentation(NetconfNode.class,
340                         new NetconfNodeBuilder()
341                                 .setConnectionStatus(ConnectionStatus.Connected)
342                                 .setClusteredConnectionStatus(
343                                         new ClusteredConnectionStatusBuilder()
344                                                 .setNodeStatus(
345                                                         Collections.singletonList(
346                                                                 new NodeStatusBuilder()
347                                                                         .setNode(clusterExtension.selfAddress().toString())
348                                                                         .setStatus(Status.Connected)
349                                                                         .build()))
350                                                 .build())
351                                 .setHost(netconfNode.getHost())
352                                 .setPort(netconfNode.getPort())
353                                 .setAvailableCapabilities(avCapabalitiesBuilder.build())
354                                 .setUnavailableCapabilities(unavailableCapabilities)
355                                 .build())
356                 .build();
357         // TODO need to implement forwarding of this msg to master
358         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
359     }
360
361     @Override
362     public void onDeviceDisconnected() {
363         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
364         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
365         LOG.debug("onDeviceDisconnected received, unregistering role candidate");
366         topologyDispatcher.unregisterMountPoint(currentOperationalNode.getNodeId());
367         roleChangeStrategy.unregisterRoleCandidate();
368         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
369         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
370                 .addAugmentation(NetconfNode.class,
371                         new NetconfNodeBuilder()
372                                 .setConnectionStatus(ConnectionStatus.Connecting)
373                                 .setClusteredConnectionStatus(
374                                         new ClusteredConnectionStatusBuilder()
375                                                 .setNodeStatus(
376                                                         Collections.singletonList(
377                                                                 new NodeStatusBuilder()
378                                                                         .setNode(clusterExtension.selfAddress().toString())
379                                                                         .setStatus(Status.Unavailable)
380                                                                         .build()))
381                                                 .build())
382                                 .setHost(netconfNode.getHost())
383                                 .setPort(netconfNode.getPort())
384                                 .build()).build();
385         // TODO need to implement forwarding of this msg to master
386         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
387     }
388
389     @Override
390     public void onDeviceFailed(Throwable throwable) {
391         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
392         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
393         LOG.debug("onDeviceFailed received");
394         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
395
396         roleChangeStrategy.unregisterRoleCandidate();
397         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
398                 .addAugmentation(NetconfNode.class,
399                         new NetconfNodeBuilder()
400                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
401                                 .setClusteredConnectionStatus(
402                                         new ClusteredConnectionStatusBuilder()
403                                                 .setNodeStatus(
404                                                         Collections.singletonList(
405                                                                 new NodeStatusBuilder()
406                                                                         .setNode(clusterExtension.selfAddress().toString())
407                                                                         .setStatus(Status.Failed)
408                                                                         .build()))
409                                                 .build())
410                                 .setConnectedMessage(reason)
411                                 .build()).build();
412         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
413     }
414
415
416     @Override
417     public void onNotification(DOMNotification domNotification) {
418         //NOOP
419     }
420
421     @Override
422     public void close() {
423         //NOOP
424     }
425
426     @Override
427     public void onReceive(Object o, ActorRef actorRef) {
428
429     }
430 }