Move mount point registration after schema resolution
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.api.NetconfMessage;
34 import org.opendaylight.netconf.api.NetconfTerminationReason;
35 import org.opendaylight.netconf.client.NetconfClientSession;
36 import org.opendaylight.netconf.client.NetconfClientSessionListener;
37 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
38 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
39 import org.opendaylight.netconf.topology.NetconfTopology;
40 import org.opendaylight.netconf.topology.NodeManager;
41 import org.opendaylight.netconf.topology.NodeManagerCallback;
42 import org.opendaylight.netconf.topology.RoleChangeStrategy;
43 import org.opendaylight.netconf.topology.TopologyManager;
44 import org.opendaylight.netconf.topology.pipeline.ClusteredNetconfDeviceCommunicator.NetconfClientSessionListenerRegistration;
45 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
46 import org.opendaylight.netconf.topology.util.BaseNodeManager;
47 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
48 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
49 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
58 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
63 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
65 import org.opendaylight.yangtools.yang.common.QName;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71
72 public class NetconfNodeManagerCallback implements NodeManagerCallback, NetconfClientSessionListener{
73
74     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
75
76     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
77         @Override
78         public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
79             return new UnavailableCapabilityBuilder()
80                     .setCapability(input.getKey().toString())
81                     .setFailureReason(input.getValue()).build();
82         }
83     };
84     public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
85         @Override
86         public String apply(QName qName) {
87             // intern string representation of a capability to avoid duplicates
88             return qName.toString().intern();
89         }
90     };
91
92     private static final String UNKNOWN_REASON = "Unknown reason";
93
94     private boolean isMaster = false;
95     private ClusteredNetconfTopology topologyDispatcher;
96     private final ActorSystem actorSystem;
97     private final Cluster clusterExtension;
98
99     private final RoleChangeStrategy roleChangeStrategy;
100
101     private String nodeId;
102     private String topologyId;
103     private TopologyManager topologyManager;
104     private NodeManager nodeManager;
105     // cached context so that we can use it in callbacks from topology
106     private ActorContext cachedContext;
107
108     private Node currentConfig;
109     private Node currentOperationalNode;
110
111     private ConnectionStatusListenerRegistration connectionStatusregistration = null;
112     private NetconfClientSessionListenerRegistration sessionListener = null;
113
114     private ActorRef masterDataBrokerRef = null;
115     private boolean connected = false;
116
117     public NetconfNodeManagerCallback(final String nodeId,
118                                       final String topologyId,
119                                       final ActorSystem actorSystem,
120                                       final NetconfTopology topologyDispatcher,
121                                       final RoleChangeStrategy roleChangeStrategy) {
122         this.nodeId = nodeId;
123         this.topologyId = topologyId;
124         this.actorSystem = actorSystem;
125         this.clusterExtension = Cluster.get(actorSystem);
126         this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
127         this.roleChangeStrategy = roleChangeStrategy;
128
129         final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
130         topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
131             @Override
132             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
133                 if (throwable != null) {
134                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
135
136                 }
137
138                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
139                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
140             }
141         }, actorSystem.dispatcher());
142
143         final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
144         nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
145             @Override
146             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
147                 if (throwable != null) {
148                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
149                 }
150                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
151                 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
152             }
153         }, actorSystem.dispatcher());
154     }
155
156
157     @Nonnull
158     @Override public Node getInitialState(@Nonnull final NodeId nodeId,
159                                           @Nonnull final Node configNode) {
160         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
161
162         final Node initialNode = new NodeBuilder()
163                 .setNodeId(nodeId)
164                 .addAugmentation(NetconfNode.class,
165                         new NetconfNodeBuilder()
166                                 .setHost(netconfNode.getHost())
167                                 .setPort(netconfNode.getPort())
168                                 .setConnectionStatus(ConnectionStatus.Connecting)
169                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
170                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
171                                 .setClusteredConnectionStatus(
172                                         new ClusteredConnectionStatusBuilder()
173                                                 .setNodeStatus(
174                                                         Lists.newArrayList(
175                                                                 new NodeStatusBuilder()
176                                                                         .setNode(clusterExtension.selfAddress().toString())
177                                                                         .setStatus(Status.Unavailable)
178                                                                         .build()))
179                                                 .build())
180                                 .build())
181                 .build();
182
183         if (currentOperationalNode == null) {
184             currentOperationalNode = initialNode;
185         }
186
187         return initialNode;
188     }
189
190     @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
191                                                   @Nullable final Node configNode) {
192         final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
193
194         final Node failedNode = new NodeBuilder()
195                 .setNodeId(nodeId)
196                 .addAugmentation(NetconfNode.class,
197                         new NetconfNodeBuilder()
198                                 .setHost(netconfNode.getHost())
199                                 .setPort(netconfNode.getPort())
200                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
201                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
202                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
203                                 .setClusteredConnectionStatus(
204                                         new ClusteredConnectionStatusBuilder()
205                                                 .setNodeStatus(
206                                                         Collections.singletonList(
207                                                                 new NodeStatusBuilder()
208                                                                         .setNode(clusterExtension.selfAddress().toString())
209                                                                         .setStatus(Status.Failed)
210                                                                         .build()))
211                                                 .build())
212                                 .build())
213                 .build();
214
215         if (currentOperationalNode == null) {
216             currentOperationalNode = failedNode;
217         }
218
219         return failedNode;
220     }
221
222     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
223                                                                    @Nonnull final Node configNode) {
224         cachedContext = TypedActor.context();
225         this.nodeId = nodeId.getValue();
226         this.currentConfig = configNode;
227         // set initial state before anything happens
228         this.currentOperationalNode = getInitialState(nodeId, configNode);
229
230         // connect magic, send config into the netconf pipeline through topo dispatcher
231         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
232
233         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
234             @Override
235             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
236                 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
237                 sessionListener = topologyDispatcher.registerNetconfClientSessionListener(nodeId, NetconfNodeManagerCallback.this);
238             }
239
240             @Override
241             public void onFailure(Throwable t) {
242                 LOG.error("Connection to device failed", t);
243             }
244         });
245
246         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
247
248         // transform future result into state that gets written into datastore
249         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
250             @Nullable
251             @Override
252             public Node apply(NetconfDeviceCapabilities input) {
253                 // build state data
254                 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
255                         .addAugmentation(NetconfNode.class,
256                                 new NetconfNodeBuilder()
257                                         .setConnectionStatus(ConnectionStatus.Connected)
258                                         .setClusteredConnectionStatus(
259                                                 new ClusteredConnectionStatusBuilder()
260                                                         .setNodeStatus(
261                                                                 Collections.singletonList(
262                                                                         new NodeStatusBuilder()
263                                                                                 .setNode(clusterExtension.selfAddress().toString())
264                                                                                 .setStatus(Status.Connected)
265                                                                                 .build()))
266                                                         .build())
267                                         .setHost(netconfNode.getHost())
268                                         .setPort(netconfNode.getPort())
269                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
270                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
271                                         .build()).build();
272                 return currentOperationalNode;
273             }
274         });
275     }
276
277     @Nonnull
278     @Override
279     public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
280                                                 @Nonnull final Node configNode) {
281         // first disconnect this node
282         topologyDispatcher.unregisterMountPoint(nodeId);
283
284         if (connectionStatusregistration != null) {
285             connectionStatusregistration.close();
286         }
287         topologyDispatcher.disconnectNode(nodeId);
288
289         // now reinit this connection with new settings
290         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
291
292         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
293             @Override
294             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
295                 connectionStatusregistration = topologyDispatcher.registerConnectionStatusListener(nodeId, NetconfNodeManagerCallback.this);
296             }
297
298             @Override
299             public void onFailure(Throwable t) {
300                 LOG.error("Connection to device failed", t);
301             }
302         });
303
304         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
305
306         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
307             @Nullable
308             @Override
309             public Node apply(NetconfDeviceCapabilities input) {
310                 // build state data
311                 return new NodeBuilder()
312                         .setNodeId(nodeId)
313                         .addAugmentation(NetconfNode.class,
314                                 new NetconfNodeBuilder()
315                                         .setConnectionStatus(ConnectionStatus.Connected)
316                                         .setClusteredConnectionStatus(
317                                                 new ClusteredConnectionStatusBuilder()
318                                                         .setNodeStatus(
319                                                                 Collections.singletonList(
320                                                                         new NodeStatusBuilder()
321                                                                                 .setNode(clusterExtension.selfAddress().toString())
322                                                                                 .setStatus(Status.Connected)
323                                                                                 .build()))
324                                                         .build())
325                                         .setHost(netconfNode.getHost())
326                                         .setPort(netconfNode.getPort())
327                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
328                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
329                                         .build())
330                         .build();
331             }
332         });
333     }
334
335     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
336         // cleanup and disconnect
337         topologyDispatcher.unregisterMountPoint(nodeId);
338
339         if(connectionStatusregistration != null) {
340             connectionStatusregistration.close();
341         }
342         roleChangeStrategy.unregisterRoleCandidate();
343         return topologyDispatcher.disconnectNode(nodeId);
344     }
345
346     @Nonnull
347     @Override
348     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
349         LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
350         return Futures.immediateFuture(currentOperationalNode);
351     }
352
353     @Override
354     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
355         topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
356
357         isMaster = roleChangeDTO.isOwner();
358     }
359
360     @Override
361     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
362         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
363         connected = true;
364         if (isMaster) {
365             LOG.debug("Master is done with schema resolution, registering mount point");
366             topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
367         } else if (masterDataBrokerRef != null) {
368             LOG.warn("Device connected, master already present in topology, registering mount point");
369             topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
370         }
371
372         List<String> capabilityList = new ArrayList<>();
373         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
374         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
375         final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
376         avCapabalitiesBuilder.setAvailableCapability(capabilityList);
377
378         final UnavailableCapabilities unavailableCapabilities =
379                 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
380                         .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
381
382         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
383         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
384                 .addAugmentation(NetconfNode.class,
385                         new NetconfNodeBuilder()
386                                 .setConnectionStatus(ConnectionStatus.Connected)
387                                 .setClusteredConnectionStatus(
388                                         new ClusteredConnectionStatusBuilder()
389                                                 .setNodeStatus(
390                                                         Collections.singletonList(
391                                                                 new NodeStatusBuilder()
392                                                                         .setNode(clusterExtension.selfAddress().toString())
393                                                                         .setStatus(Status.Connected)
394                                                                         .build()))
395                                                 .build())
396                                 .setHost(netconfNode.getHost())
397                                 .setPort(netconfNode.getPort())
398                                 .setAvailableCapabilities(avCapabalitiesBuilder.build())
399                                 .setUnavailableCapabilities(unavailableCapabilities)
400                                 .build())
401                 .build();
402         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
403     }
404
405     @Override
406     public void onDeviceDisconnected() {
407         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
408         LOG.debug("onDeviceDisconnected received, unregistered role candidate");
409         connected = false;
410         if (isMaster) {
411             // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
412             isMaster = false;
413             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
414             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
415         }
416
417         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
418         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
419                 .addAugmentation(NetconfNode.class,
420                         new NetconfNodeBuilder()
421                                 .setConnectionStatus(ConnectionStatus.Connecting)
422                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
423                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
424                                 .setClusteredConnectionStatus(
425                                         new ClusteredConnectionStatusBuilder()
426                                                 .setNodeStatus(
427                                                         Collections.singletonList(
428                                                                 new NodeStatusBuilder()
429                                                                         .setNode(clusterExtension.selfAddress().toString())
430                                                                         .setStatus(Status.Unavailable)
431                                                                         .build()))
432                                                 .build())
433                                 .setHost(netconfNode.getHost())
434                                 .setPort(netconfNode.getPort())
435                                 .build()).build();
436         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
437     }
438
439     @Override
440     public void onDeviceFailed(Throwable throwable) {
441         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
442         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
443         LOG.warn("Netconf node {} failed with {}", nodeId, throwable);
444         connected = false;
445         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
446
447         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
448                 .addAugmentation(NetconfNode.class,
449                         new NetconfNodeBuilder()
450                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
451                                 .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
452                                 .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
453                                 .setClusteredConnectionStatus(
454                                         new ClusteredConnectionStatusBuilder()
455                                                 .setNodeStatus(
456                                                         Collections.singletonList(
457                                                                 new NodeStatusBuilder()
458                                                                         .setNode(clusterExtension.selfAddress().toString())
459                                                                         .setStatus(Status.Failed)
460                                                                         .build()))
461                                                 .build())
462                                 .setConnectedMessage(reason)
463                                 .build()).build();
464         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
465     }
466
467     @Override
468     public void onNotification(DOMNotification domNotification) {
469         //NOOP
470     }
471
472     @Override
473     public void close() {
474         //NOOP
475     }
476
477     @Override
478     public void onReceive(Object message, ActorRef actorRef) {
479         LOG.debug("Netconf node callback received message {}", message);
480         if (message instanceof AnnounceMasterMountPoint) {
481             masterDataBrokerRef = actorRef;
482             // candidate gets registered when mount point is already prepared so we can go ahead a register it
483             if (connected) {
484                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
485             } else {
486                 LOG.debug("Announce master mount point msg received but mount point is not ready yet");
487             }
488         } else if (message instanceof AnnounceMasterMountPointDown) {
489             LOG.debug("Master mountpoint went down");
490             masterDataBrokerRef = null;
491             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
492         }
493     }
494
495     @Override
496     public void onSessionUp(NetconfClientSession netconfClientSession) {
497         //NetconfClientSession is up, we can register role candidate
498         LOG.debug("Netconf client session is up, registering role candidate");
499         roleChangeStrategy.registerRoleCandidate(nodeManager);
500     }
501
502     @Override
503     public void onSessionDown(NetconfClientSession netconfClientSession, Exception e) {
504         LOG.debug("Netconf client session is down, unregistering role candidate");
505         roleChangeStrategy.unregisterRoleCandidate();
506     }
507
508     @Override
509     public void onSessionTerminated(NetconfClientSession netconfClientSession, NetconfTerminationReason netconfTerminationReason) {
510         LOG.debug("Netconf client session is down, unregistering role candidate");
511         roleChangeStrategy.unregisterRoleCandidate();
512     }
513
514     @Override
515     public void onMessage(NetconfClientSession netconfClientSession, NetconfMessage netconfMessage) {
516         //NOOP
517     }
518 }