builder class for path creating
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.api.NetconfMessage;
34 import org.opendaylight.netconf.api.NetconfTerminationReason;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
38 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
39 import org.opendaylight.netconf.topology.NetconfTopology;
40 import org.opendaylight.netconf.topology.NodeManager;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.RoleChangeStrategy;
43 import org.opendaylight.netconf.topology.TopologyManager;
44 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseNodeManager;
47 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
48 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
49 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
50 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
66 import org.opendaylight.yangtools.yang.common.QName;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
72
73 public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
74
75     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
76
77     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
78         @Override
79         public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
80             return new UnavailableCapabilityBuilder()
81                     .setCapability(input.getKey().toString())
82                     .setFailureReason(input.getValue()).build();
83         }
84     };
85     public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
86         @Override
87         public String apply(QName qName) {
88             // intern string representation of a capability to avoid duplicates
89             return qName.toString().intern();
90         }
91     };
92
93     private static final String UNKNOWN_REASON = "Unknown reason";
94
95     private boolean isMaster = false;
96     private ClusteredNetconfTopology topologyDispatcher;
97     private final ActorSystem actorSystem;
98     private final Cluster clusterExtension;
99
100     private final RoleChangeStrategy roleChangeStrategy;
101
102     private String nodeId;
103     private String topologyId;
104     private TopologyManager topologyManager;
105     private NodeManager nodeManager;
106     // cached context so that we can use it in callbacks from topology
107     private ActorContext cachedContext;
108
109     private Node currentConfig;
110     private Node currentOperationalNode;
111
112     private ConnectionStatusListenerRegistration connectionStatusregistration = null;
113     private NetconfClientSessionListenerRegistration sessionListener = null;
114
115     private ActorRef masterDataBrokerRef = null;
116     private boolean connected = false;
117
118     public NetconfNodeManagerCallback(final String nodeId,
119                                       final String topologyId,
120                                       final ActorSystem actorSystem,
121                                       final NetconfTopology topologyDispatcher,
122                                       final RoleChangeStrategy roleChangeStrategy) {
123         this.nodeId = nodeId;
124         this.topologyId = topologyId;
125         this.actorSystem = actorSystem;
126         this.clusterExtension = Cluster.get(actorSystem);
127         this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
128         this.roleChangeStrategy = roleChangeStrategy;
129
130         final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(topologyId);
131         final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection(pathCreator.build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
132         topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
133             @Override
134             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
135                 if (throwable != null) {
136                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
137
138                 }
139
140                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
141                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
142             }
143         }, actorSystem.dispatcher());
144
145         final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection(pathCreator.withSuffix(nodeId).build()).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
146         nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
147             @Override
148             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
149                 if (throwable != null) {
150                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
151                 }
152                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
153                 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
154             }
155         }, actorSystem.dispatcher());
156     }
157
158
159     @Nonnull
160     @Override public Node getInitialState(@Nonnull final NodeId nodeId,
161                                           @Nonnull final Node configNode) {
162         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
163
164         final Node initialNode = new NodeBuilder()
165                 .setNodeId(nodeId)
166                 .addAugmentation(NetconfNode.class,
167                         new NetconfNodeBuilder()
168                                 .setHost(netconfNode.getHost())
169                                 .setPort(netconfNode.getPort())
170                                 .setConnectionStatus(ConnectionStatus.Connecting)
171                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
172                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
173                                 .setClusteredConnectionStatus(
174                                         new ClusteredConnectionStatusBuilder()
175                                                 .setNodeStatus(
176                                                         Lists.newArrayList(
177                                                                 new NodeStatusBuilder()
178                                                                         .setNode(clusterExtension.selfAddress().toString())
179                                                                         .setStatus(Status.Unavailable)
180                                                                         .build()))
181                                                 .build())
182                                 .build())
183                 .build();
184
185         if (currentOperationalNode == null) {
186             currentOperationalNode = initialNode;
187         }
188
189         return initialNode;
190     }
191
192     @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
193                                                   @Nullable final Node configNode) {
194         final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
195
196         final Node failedNode = new NodeBuilder()
197                 .setNodeId(nodeId)
198                 .addAugmentation(NetconfNode.class,
199                         new NetconfNodeBuilder()
200                                 .setHost(netconfNode.getHost())
201                                 .setPort(netconfNode.getPort())
202                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
203                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
204                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
205                                 .setClusteredConnectionStatus(
206                                         new ClusteredConnectionStatusBuilder()
207                                                 .setNodeStatus(
208                                                         Collections.singletonList(
209                                                                 new NodeStatusBuilder()
210                                                                         .setNode(clusterExtension.selfAddress().toString())
211                                                                         .setStatus(Status.Failed)
212                                                                         .build()))
213                                                 .build())
214                                 .build())
215                 .build();
216
217         if (currentOperationalNode == null) {
218             currentOperationalNode = failedNode;
219         }
220
221         return failedNode;
222     }
223
224     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
225                                                                    @Nonnull final Node configNode) {
226         cachedContext = TypedActor.context();
227         this.nodeId = nodeId.getValue();
228         this.currentConfig = configNode;
229         // set initial state before anything happens
230         this.currentOperationalNode = getInitialState(nodeId, configNode);
231
232         // connect magic, send config into the netconf pipeline through topo dispatcher
233         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
234
235         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
236             @Override
237             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
238                 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
239                 sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
240             }
241
242             @Override
243             public void onFailure(Throwable t) {
244                 LOG.error("Connection to device failed", t);
245             }
246         });
247
248         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
249
250         // transform future result into state that gets written into datastore
251         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
252             @Nullable
253             @Override
254             public Node apply(NetconfDeviceCapabilities input) {
255                 // build state data
256                 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
257                         .addAugmentation(NetconfNode.class,
258                                 new NetconfNodeBuilder()
259                                         .setConnectionStatus(ConnectionStatus.Connected)
260                                         .setClusteredConnectionStatus(
261                                                 new ClusteredConnectionStatusBuilder()
262                                                         .setNodeStatus(
263                                                                 Collections.singletonList(
264                                                                         new NodeStatusBuilder()
265                                                                                 .setNode(clusterExtension.selfAddress().toString())
266                                                                                 .setStatus(Status.Connected)
267                                                                                 .build()))
268                                                         .build())
269                                         .setHost(netconfNode.getHost())
270                                         .setPort(netconfNode.getPort())
271                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
272                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
273                                         .build()).build();
274                 return currentOperationalNode;
275             }
276         });
277     }
278
279     @Nonnull
280     @Override
281     public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
282                                                 @Nonnull final Node configNode) {
283         // first disconnect this node
284         topologyDispatcher.unregisterMountPoint(nodeId);
285
286         if (connectionStatusregistration != null) {
287             connectionStatusregistration.close();
288         }
289         topologyDispatcher.disconnectNode(nodeId);
290
291         // now reinit this connection with new settings
292         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
293
294         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
295             @Override
296             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
297                 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
298             }
299
300             @Override
301             public void onFailure(Throwable t) {
302                 LOG.error("Connection to device failed", t);
303             }
304         });
305
306         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
307
308         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
309             @Nullable
310             @Override
311             public Node apply(NetconfDeviceCapabilities input) {
312                 // build state data
313                 return new NodeBuilder()
314                         .setNodeId(nodeId)
315                         .addAugmentation(NetconfNode.class,
316                                 new NetconfNodeBuilder()
317                                         .setConnectionStatus(ConnectionStatus.Connected)
318                                         .setClusteredConnectionStatus(
319                                                 new ClusteredConnectionStatusBuilder()
320                                                         .setNodeStatus(
321                                                                 Collections.singletonList(
322                                                                         new NodeStatusBuilder()
323                                                                                 .setNode(clusterExtension.selfAddress().toString())
324                                                                                 .setStatus(Status.Connected)
325                                                                                 .build()))
326                                                         .build())
327                                         .setHost(netconfNode.getHost())
328                                         .setPort(netconfNode.getPort())
329                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
330                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
331                                         .build())
332                         .build();
333             }
334         });
335     }
336
337     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
338         // cleanup and disconnect
339         topologyDispatcher.unregisterMountPoint(nodeId);
340
341         if(connectionStatusregistration != null) {
342             connectionStatusregistration.close();
343         }
344         roleChangeStrategy.unregisterRoleCandidate();
345         return topologyDispatcher.disconnectNode(nodeId);
346     }
347
348     @Nonnull
349     @Override
350     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
351         LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
352         return Futures.immediateFuture(currentOperationalNode);
353     }
354
355     @Override
356     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
357         topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
358
359         isMaster = roleChangeDTO.isOwner();
360     }
361
362     @Override
363     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
364         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
365         connected = true;
366         if (isMaster) {
367             LOG.debug("Master is done with schema resolution, registering mount point");
368             topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
369         } else if (masterDataBrokerRef != null) {
370             LOG.warn("Device connected, master already present in topology, registering mount point");
371             topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
372         }
373
374         List<String> capabilityList = new ArrayList<>();
375         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
376         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
377         final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
378         avCapabalitiesBuilder.setAvailableCapability(capabilityList);
379
380         final UnavailableCapabilities unavailableCapabilities =
381                 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
382                         .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
383
384         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
385         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
386                 .addAugmentation(NetconfNode.class,
387                         new NetconfNodeBuilder()
388                                 .setConnectionStatus(ConnectionStatus.Connected)
389                                 .setClusteredConnectionStatus(
390                                         new ClusteredConnectionStatusBuilder()
391                                                 .setNodeStatus(
392                                                         Collections.singletonList(
393                                                                 new NodeStatusBuilder()
394                                                                         .setNode(clusterExtension.selfAddress().toString())
395                                                                         .setStatus(Status.Connected)
396                                                                         .build()))
397                                                 .build())
398                                 .setHost(netconfNode.getHost())
399                                 .setPort(netconfNode.getPort())
400                                 .setAvailableCapabilities(avCapabalitiesBuilder.build())
401                                 .setUnavailableCapabilities(unavailableCapabilities)
402                                 .build())
403                 .build();
404         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
405     }
406
407     @Override
408     public void onDeviceDisconnected() {
409         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
410         LOG.debug("onDeviceDisconnected received, unregistered role candidate");
411         connected = false;
412         if (isMaster) {
413             // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
414             isMaster = false;
415             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
416             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
417         }
418
419         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
420         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
421                 .addAugmentation(NetconfNode.class,
422                         new NetconfNodeBuilder()
423                                 .setConnectionStatus(ConnectionStatus.Connecting)
424                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
425                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
426                                 .setClusteredConnectionStatus(
427                                         new ClusteredConnectionStatusBuilder()
428                                                 .setNodeStatus(
429                                                         Collections.singletonList(
430                                                                 new NodeStatusBuilder()
431                                                                         .setNode(clusterExtension.selfAddress().toString())
432                                                                         .setStatus(Status.Unavailable)
433                                                                         .build()))
434                                                 .build())
435                                 .setHost(netconfNode.getHost())
436                                 .setPort(netconfNode.getPort())
437                                 .build()).build();
438         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
439     }
440
441     @Override
442     public void onDeviceFailed(Throwable throwable) {
443         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
444         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
445         LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
446         connected = false;
447         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
448
449         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
450                 .addAugmentation(NetconfNode.class,
451                         new NetconfNodeBuilder()
452                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
453                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
454                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
455                                 .setClusteredConnectionStatus(
456                                         new ClusteredConnectionStatusBuilder()
457                                                 .setNodeStatus(
458                                                         Collections.singletonList(
459                                                                 new NodeStatusBuilder()
460                                                                         .setNode(clusterExtension.selfAddress().toString())
461                                                                         .setStatus(Status.Failed)
462                                                                         .build()))
463                                                 .build())
464                                 .setConnectedMessage(reason)
465                                 .build()).build();
466         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
467     }
468
469     @Override
470     public void onNotification(DOMNotification domNotification) {
471         //NOOP
472     }
473
474     @Override
475     public void close() {
476         //NOOP
477     }
478
479     @Override
480     public void onReceive(Object message, ActorRef actorRef) {
481         LOG.debug("Netconf node callback received message {}", message);
482         if (message instanceof AnnounceMasterMountPoint) {
483             masterDataBrokerRef = actorRef;
484             // candidate gets registered when mount point is already prepared so we can go ahead a register it
485             if (connected) {
486                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
487             } else {
488                 LOG.debug("Announce master mount point msg received but mount point is not ready yet");
489             }
490         } else if (message instanceof AnnounceMasterMountPointDown) {
491             LOG.debug("Master mountpoint went down");
492             masterDataBrokerRef = null;
493             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
494         }
495     }
496
497     @Override
498     public void onSessionUp(NetconfClientSession netconfClientSession) {
499         //NetconfClientSession is up, we can register role candidate
500         LOG.debug("Netconf client session is up, registering role candidate");
501         roleChangeStrategy.registerRoleCandidate(nodeManager);
502     }
503
504     @Override
505     public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
506         LOG.debug("Netconf client session is down, unregistering role candidate");
507         roleChangeStrategy.unregisterRoleCandidate();
508     }
509
510     @Override
511     public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
512         LOG.debug("Netconf client session is down, unregistering role candidate");
513         roleChangeStrategy.unregisterRoleCandidate();
514     }
515
516     @Override
517     public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {
518         //NOOP
519     }
520 }