Bug 6714 - Use singleton service in clustered netconf topology
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.api.NetconfMessage;
34 import org.opendaylight.netconf.api.NetconfTerminationReason;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
38 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
39 import org.opendaylight.netconf.topology.NetconfTopology;
40 import org.opendaylight.netconf.topology.NodeManager;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.RoleChangeStrategy;
43 import org.opendaylight.netconf.topology.TopologyManager;
44 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseNodeManager;
47 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
48 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
49 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
50 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
67 import org.opendaylight.yangtools.yang.common.QName;
68 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.concurrent.Future;
72 import scala.concurrent.duration.FiniteDuration;
73
74 public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
75
76     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
77
78     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
79         @Override
80         public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
81             return new UnavailableCapabilityBuilder()
82                     .setCapability(input.getKey().toString())
83                     .setFailureReason(input.getValue()).build();
84         }
85     };
86
87     private static final String UNKNOWN_REASON = "Unknown reason";
88
89     private boolean isMaster = false;
90     private ClusteredNetconfTopology topologyDispatcher;
91     private final ActorSystem actorSystem;
92     private final Cluster clusterExtension;
93
94     private final RoleChangeStrategy roleChangeStrategy;
95
96     private String nodeId;
97     private String topologyId;
98     private TopologyManager topologyManager;
99     private NodeManager nodeManager;
100     // cached context so that we can use it in callbacks from topology
101     private ActorContext cachedContext;
102
103     private Node currentConfig;
104     private Node currentOperationalNode;
105
106     private ConnectionStatusListenerRegistration connectionStatusregistration = null;
107     private NetconfClientSessionListenerRegistration sessionListener = null;
108
109     private ActorRef masterDataBrokerRef = null;
110     private boolean connected = false;
111
112     public NetconfNodeManagerCallback(final String nodeId,
113                                       final String topologyId,
114                                       final ActorSystem actorSystem,
115                                       final NetconfTopology topologyDispatcher,
116                                       final RoleChangeStrategy roleChangeStrategy) {
117         this.nodeId = nodeId;
118         this.topologyId = topologyId;
119         this.actorSystem = actorSystem;
120         this.clusterExtension = Cluster.get(actorSystem);
121         this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
122         this.roleChangeStrategy = roleChangeStrategy;
123
124         final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(topologyId);
125         final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection(pathCreator.build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
126         topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
127             @Override
128             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
129                 if (throwable != null) {
130                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
131
132                 }
133
134                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
135                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
136             }
137         }, actorSystem.dispatcher());
138
139         final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection(pathCreator.withSuffix(nodeId).build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
140         nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
141             @Override
142             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
143                 if (throwable != null) {
144                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
145                 }
146                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
147                 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
148             }
149         }, actorSystem.dispatcher());
150     }
151
152
153     @Nonnull
154     @Override public Node getInitialState(@Nonnull final NodeId nodeId,
155                                           @Nonnull final Node configNode) {
156         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
157
158         final Node initialNode = new NodeBuilder()
159                 .setNodeId(nodeId)
160                 .addAugmentation(NetconfNode.class,
161                         new NetconfNodeBuilder()
162                                 .setHost(netconfNode.getHost())
163                                 .setPort(netconfNode.getPort())
164                                 .setConnectionStatus(ConnectionStatus.Connecting)
165                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
166                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
167                                 .setClusteredConnectionStatus(
168                                         new ClusteredConnectionStatusBuilder()
169                                                 .setNodeStatus(
170                                                         Lists.newArrayList(
171                                                                 new NodeStatusBuilder()
172                                                                         .setNode(clusterExtension.selfAddress().toString())
173                                                                         .setStatus(Status.Unavailable)
174                                                                         .build()))
175                                                 .build())
176                                 .build())
177                 .build();
178
179         if (currentOperationalNode == null) {
180             currentOperationalNode = initialNode;
181         }
182
183         return initialNode;
184     }
185
186     @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
187                                                   @Nullable final Node configNode) {
188         final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
189
190         final Node failedNode = new NodeBuilder()
191                 .setNodeId(nodeId)
192                 .addAugmentation(NetconfNode.class,
193                         new NetconfNodeBuilder()
194                                 .setHost(netconfNode.getHost())
195                                 .setPort(netconfNode.getPort())
196                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
197                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
198                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
199                                 .setClusteredConnectionStatus(
200                                         new ClusteredConnectionStatusBuilder()
201                                                 .setNodeStatus(
202                                                         Collections.singletonList(
203                                                                 new NodeStatusBuilder()
204                                                                         .setNode(clusterExtension.selfAddress().toString())
205                                                                         .setStatus(Status.Failed)
206                                                                         .build()))
207                                                 .build())
208                                 .build())
209                 .build();
210
211         if (currentOperationalNode == null) {
212             currentOperationalNode = failedNode;
213         }
214
215         return failedNode;
216     }
217
218     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
219                                                                    @Nonnull final Node configNode) {
220         cachedContext = TypedActor.context();
221         this.nodeId = nodeId.getValue();
222         this.currentConfig = configNode;
223         // set initial state before anything happens
224         this.currentOperationalNode = getInitialState(nodeId, configNode);
225
226         // connect magic, send config into the netconf pipeline through topo dispatcher
227         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
228
229         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
230             @Override
231             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
232                 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
233                 sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
234             }
235
236             @Override
237             public void onFailure(Throwable t) {
238                 LOG.error("Connection to device failed", t);
239             }
240         });
241
242         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
243
244         // transform future result into state that gets written into datastore
245         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
246             @Nullable
247             @Override
248             public Node apply(NetconfDeviceCapabilities input) {
249                 // build state data
250                 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
251                         .addAugmentation(NetconfNode.class,
252                                 new NetconfNodeBuilder()
253                                         .setConnectionStatus(ConnectionStatus.Connected)
254                                         .setClusteredConnectionStatus(
255                                                 new ClusteredConnectionStatusBuilder()
256                                                         .setNodeStatus(
257                                                                 Collections.singletonList(
258                                                                         new NodeStatusBuilder()
259                                                                                 .setNode(clusterExtension.selfAddress().toString())
260                                                                                 .setStatus(Status.Connected)
261                                                                                 .build()))
262                                                         .build())
263                                         .setHost(netconfNode.getHost())
264                                         .setPort(netconfNode.getPort())
265                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
266                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
267                                         .build()).build();
268                 return currentOperationalNode;
269             }
270         });
271     }
272
273     @Nonnull
274     @Override
275     public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
276                                                 @Nonnull final Node configNode) {
277         // first disconnect this node
278         topologyDispatcher.unregisterMountPoint(nodeId);
279
280         if (connectionStatusregistration != null) {
281             connectionStatusregistration.close();
282         }
283         topologyDispatcher.disconnectNode(nodeId);
284
285         // now reinit this connection with new settings
286         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
287
288         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
289             @Override
290             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
291                 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
292             }
293
294             @Override
295             public void onFailure(Throwable t) {
296                 LOG.error("Connection to device failed", t);
297             }
298         });
299
300         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
301
302         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
303             @Nullable
304             @Override
305             public Node apply(NetconfDeviceCapabilities input) {
306                 // build state data
307                 return new NodeBuilder()
308                         .setNodeId(nodeId)
309                         .addAugmentation(NetconfNode.class,
310                                 new NetconfNodeBuilder()
311                                         .setConnectionStatus(ConnectionStatus.Connected)
312                                         .setClusteredConnectionStatus(
313                                                 new ClusteredConnectionStatusBuilder()
314                                                         .setNodeStatus(
315                                                                 Collections.singletonList(
316                                                                         new NodeStatusBuilder()
317                                                                                 .setNode(clusterExtension.selfAddress().toString())
318                                                                                 .setStatus(Status.Connected)
319                                                                                 .build()))
320                                                         .build())
321                                         .setHost(netconfNode.getHost())
322                                         .setPort(netconfNode.getPort())
323                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
324                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
325                                         .build())
326                         .build();
327             }
328         });
329     }
330
331     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
332         // cleanup and disconnect
333         topologyDispatcher.unregisterMountPoint(nodeId);
334
335         if(connectionStatusregistration != null) {
336             connectionStatusregistration.close();
337         }
338         roleChangeStrategy.unregisterRoleCandidate();
339         return topologyDispatcher.disconnectNode(nodeId);
340     }
341
342     @Nonnull
343     @Override
344     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
345         LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
346         return Futures.immediateFuture(currentOperationalNode);
347     }
348
349     @Override
350     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
351         topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
352
353         isMaster = roleChangeDTO.isOwner();
354     }
355
356     @Override
357     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
358         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
359         connected = true;
360         if (isMaster) {
361             LOG.debug("Master is done with schema resolution, registering mount point");
362             topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
363         } else if (masterDataBrokerRef != null) {
364             LOG.warn("Device connected, master already present in topology, registering mount point");
365             topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
366         }
367
368         List<AvailableCapability> capabilityList = new ArrayList<>();
369         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
370         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities());
371         final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
372         avCapabalitiesBuilder.setAvailableCapability(capabilityList);
373
374         final UnavailableCapabilities unavailableCapabilities =
375                 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
376                         .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
377
378         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
379         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
380                 .addAugmentation(NetconfNode.class,
381                         new NetconfNodeBuilder()
382                                 .setConnectionStatus(ConnectionStatus.Connected)
383                                 .setClusteredConnectionStatus(
384                                         new ClusteredConnectionStatusBuilder()
385                                                 .setNodeStatus(
386                                                         Collections.singletonList(
387                                                                 new NodeStatusBuilder()
388                                                                         .setNode(clusterExtension.selfAddress().toString())
389                                                                         .setStatus(Status.Connected)
390                                                                         .build()))
391                                                 .build())
392                                 .setHost(netconfNode.getHost())
393                                 .setPort(netconfNode.getPort())
394                                 .setAvailableCapabilities(avCapabalitiesBuilder.build())
395                                 .setUnavailableCapabilities(unavailableCapabilities)
396                                 .build())
397                 .build();
398         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
399     }
400
401     @Override
402     public void onDeviceDisconnected() {
403         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
404         LOG.debug("onDeviceDisconnected received, unregistered role candidate");
405         connected = false;
406         if (isMaster) {
407             // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
408             isMaster = false;
409             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
410             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
411         }
412
413         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
414         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
415                 .addAugmentation(NetconfNode.class,
416                         new NetconfNodeBuilder()
417                                 .setConnectionStatus(ConnectionStatus.Connecting)
418                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
419                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
420                                 .setClusteredConnectionStatus(
421                                         new ClusteredConnectionStatusBuilder()
422                                                 .setNodeStatus(
423                                                         Collections.singletonList(
424                                                                 new NodeStatusBuilder()
425                                                                         .setNode(clusterExtension.selfAddress().toString())
426                                                                         .setStatus(Status.Unavailable)
427                                                                         .build()))
428                                                 .build())
429                                 .setHost(netconfNode.getHost())
430                                 .setPort(netconfNode.getPort())
431                                 .build()).build();
432         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
433     }
434
435     @Override
436     public void onDeviceFailed(Throwable throwable) {
437         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
438         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
439         LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
440         connected = false;
441         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
442
443         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
444                 .addAugmentation(NetconfNode.class,
445                         new NetconfNodeBuilder()
446                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
447                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<AvailableCapability>()).build())
448                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
449                                 .setClusteredConnectionStatus(
450                                         new ClusteredConnectionStatusBuilder()
451                                                 .setNodeStatus(
452                                                         Collections.singletonList(
453                                                                 new NodeStatusBuilder()
454                                                                         .setNode(clusterExtension.selfAddress().toString())
455                                                                         .setStatus(Status.Failed)
456                                                                         .build()))
457                                                 .build())
458                                 .setConnectedMessage(reason)
459                                 .build()).build();
460         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
461     }
462
463     @Override
464     public void onNotification(DOMNotification domNotification) {
465         //NOOP
466     }
467
468     @Override
469     public void close() {
470         //NOOP
471     }
472
473     @Override
474     public void onReceive(Object message, ActorRef actorRef) {
475         LOG.debug("Netconf node callback received message {}", message);
476         if (message instanceof AnnounceMasterMountPoint) {
477             masterDataBrokerRef = actorRef;
478             // candidate gets registered when mount point is already prepared so we can go ahead a register it
479             if (connected) {
480                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
481             } else {
482                 LOG.debug("Announce master mount point msg received but mount point is not ready yet");
483             }
484         } else if (message instanceof AnnounceMasterMountPointDown) {
485             LOG.debug("Master mountpoint went down");
486             masterDataBrokerRef = null;
487             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
488         }
489     }
490
491     @Override
492     public void onSessionUp(NetconfClientSession netconfClientSession) {
493         //NetconfClientSession is up, we can register role candidate
494         LOG.debug("Netconf client session is up, registering role candidate");
495         roleChangeStrategy.registerRoleCandidate(nodeManager);
496     }
497
498     @Override
499     public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
500         LOG.debug("Netconf client session is down, unregistering role candidate");
501         roleChangeStrategy.unregisterRoleCandidate();
502     }
503
504     @Override
505     public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
506         LOG.debug("Netconf client session is down, unregistering role candidate");
507         roleChangeStrategy.unregisterRoleCandidate();
508     }
509
510     @Override
511     public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {
512         //NOOP
513     }
514 }