Use normal identify messages first
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / impl / NetconfNodeManagerCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.impl;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedProps;
16 import akka.cluster.Cluster;
17 import akka.dispatch.OnComplete;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map.Entry;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
32 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
34 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
35 import org.opendaylight.netconf.topology.NetconfTopology;
36 import org.opendaylight.netconf.topology.NodeManager;
37 import org.opendaylight.netconf.topology.NodeManagerCallback;
38 import org.opendaylight.netconf.topology.RoleChangeStrategy;
39 import org.opendaylight.netconf.topology.TopologyManager;
40 import org.opendaylight.netconf.topology.pipeline.TopologyMountPointFacade.ConnectionStatusListenerRegistration;
41 import org.opendaylight.netconf.topology.util.BaseNodeManager;
42 import org.opendaylight.netconf.topology.util.BaseTopologyManager;
43 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPoint;
44 import org.opendaylight.netconf.topology.util.messages.AnnounceMasterMountPointDown;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilities;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability.FailureReason;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapabilityBuilder;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
60 import org.opendaylight.yangtools.yang.common.QName;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.Future;
65 import scala.concurrent.duration.FiniteDuration;
66
67 public class NetconfNodeManagerCallback implements NodeManagerCallback{
68
69     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeManagerCallback.class);
70
71     public static final Function<Entry<QName, FailureReason>, UnavailableCapability> UNAVAILABLE_CAPABILITY_TRANSFORMER = new Function<Entry<QName, FailureReason>, UnavailableCapability>() {
72         @Override
73         public UnavailableCapability apply(final Entry<QName, FailureReason> input) {
74             return new UnavailableCapabilityBuilder()
75                     .setCapability(input.getKey().toString())
76                     .setFailureReason(input.getValue()).build();
77         }
78     };
79     public static final Function<QName, String> AVAILABLE_CAPABILITY_TRANSFORMER = new Function<QName, String>() {
80         @Override
81         public String apply(QName qName) {
82             // intern string representation of a capability to avoid duplicates
83             return qName.toString().intern();
84         }
85     };
86
87     private static final String UNKNOWN_REASON = "Unknown reason";
88
89     private boolean isMaster = false;
90     private ClusteredNetconfTopology topologyDispatcher;
91     private final ActorSystem actorSystem;
92     private final Cluster clusterExtension;
93
94     private final RoleChangeStrategy roleChangeStrategy;
95
96     private String nodeId;
97     private String topologyId;
98     private TopologyManager topologyManager;
99     private NodeManager nodeManager;
100     // cached context so that we can use it in callbacks from topology
101     private ActorContext cachedContext;
102
103     private Node currentConfig;
104     private Node currentOperationalNode;
105
106     private ConnectionStatusListenerRegistration registration = null;
107
108     private ActorRef masterDataBrokerRef = null;
109     private boolean connected = false;
110
111     public NetconfNodeManagerCallback(final String nodeId,
112                                       final String topologyId,
113                                       final ActorSystem actorSystem,
114                                       final NetconfTopology topologyDispatcher,
115                                       final RoleChangeStrategy roleChangeStrategy) {
116         this.nodeId = nodeId;
117         this.topologyId = topologyId;
118         this.actorSystem = actorSystem;
119         this.clusterExtension = Cluster.get(actorSystem);
120         this.topologyDispatcher = (ClusteredNetconfTopology) topologyDispatcher;
121         this.roleChangeStrategy = roleChangeStrategy;
122
123         final Future<ActorRef> topologyRefFuture = actorSystem.actorSelection("/user/" + topologyId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
124         topologyRefFuture.onComplete(new OnComplete<ActorRef>() {
125             @Override
126             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
127                 if (throwable != null) {
128                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId, throwable);
129
130                 }
131
132                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
133                 topologyManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
134             }
135         }, actorSystem.dispatcher());
136
137         final Future<ActorRef> nodeRefFuture = actorSystem.actorSelection("/user/" + topologyId + "/" + nodeId).resolveOne(FiniteDuration.create(10L, TimeUnit.SECONDS));
138         nodeRefFuture.onComplete(new OnComplete<ActorRef>() {
139             @Override
140             public void onComplete(Throwable throwable, ActorRef actorRef) throws Throwable {
141                 if (throwable != null) {
142                     LOG.warn("Unable to resolve actor for path: {} ", "/user/" + topologyId + "/" + nodeId, throwable);
143                 }
144                 LOG.debug("Actor ref for path {} resolved", "/user/" + topologyId);
145                 nodeManager = TypedActor.get(actorSystem).typedActorOf(new TypedProps<>(NodeManager.class, BaseNodeManager.class), actorRef);
146             }
147         }, actorSystem.dispatcher());
148     }
149
150
151     @Nonnull
152     @Override public Node getInitialState(@Nonnull final NodeId nodeId,
153                                           @Nonnull final Node configNode) {
154         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
155
156         final Node initialNode = new NodeBuilder()
157                 .setNodeId(nodeId)
158                 .addAugmentation(NetconfNode.class,
159                         new NetconfNodeBuilder()
160                                 .setHost(netconfNode.getHost())
161                                 .setPort(netconfNode.getPort())
162                                 .setConnectionStatus(ConnectionStatus.Connecting)
163                                 .setClusteredConnectionStatus(
164                                         new ClusteredConnectionStatusBuilder()
165                                                 .setNodeStatus(
166                                                         Lists.newArrayList(
167                                                                 new NodeStatusBuilder()
168                                                                         .setNode(clusterExtension.selfAddress().toString())
169                                                                         .setStatus(Status.Unavailable)
170                                                                         .build()))
171                                                 .build())
172                                 .build())
173                 .build();
174
175         if (currentOperationalNode == null) {
176             currentOperationalNode = initialNode;
177         }
178
179         return initialNode;
180     }
181
182     @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
183                                                   @Nullable final Node configNode) {
184         final NetconfNode netconfNode = configNode == null ? currentOperationalNode.getAugmentation(NetconfNode.class) : configNode.getAugmentation(NetconfNode.class);
185
186         final Node failedNode = new NodeBuilder()
187                 .setNodeId(nodeId)
188                 .addAugmentation(NetconfNode.class,
189                         new NetconfNodeBuilder()
190                                 .setHost(netconfNode.getHost())
191                                 .setPort(netconfNode.getPort())
192                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
193                                 .setClusteredConnectionStatus(
194                                         new ClusteredConnectionStatusBuilder()
195                                                 .setNodeStatus(
196                                                         Collections.singletonList(
197                                                                 new NodeStatusBuilder()
198                                                                         .setNode(clusterExtension.selfAddress().toString())
199                                                                         .setStatus(Status.Failed)
200                                                                         .build()))
201                                                 .build())
202                                 .build())
203                 .build();
204
205         if (currentOperationalNode == null) {
206             currentOperationalNode = failedNode;
207         }
208
209         return failedNode;
210     }
211
212     @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
213                                                                    @Nonnull final Node configNode) {
214         cachedContext = TypedActor.context();
215         this.nodeId = nodeId.getValue();
216         this.currentConfig = configNode;
217         // set initial state before anything happens
218         this.currentOperationalNode = getInitialState(nodeId, configNode);
219
220         // connect magic, send config into the netconf pipeline through topo dispatcher
221         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
222
223         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
224             @Override
225             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
226                 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
227             }
228
229             @Override
230             public void onFailure(Throwable t) {
231                 LOG.error("Connection to device failed", t);
232             }
233         });
234
235         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
236
237         // transform future result into state that gets written into datastore
238         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
239             @Nullable
240             @Override
241             public Node apply(NetconfDeviceCapabilities input) {
242                 // build state data
243                 currentOperationalNode = new NodeBuilder().setNodeId(nodeId)
244                         .addAugmentation(NetconfNode.class,
245                                 new NetconfNodeBuilder()
246                                         .setConnectionStatus(ConnectionStatus.Connected)
247                                         .setClusteredConnectionStatus(
248                                                 new ClusteredConnectionStatusBuilder()
249                                                         .setNodeStatus(
250                                                                 Collections.singletonList(
251                                                                         new NodeStatusBuilder()
252                                                                                 .setNode(clusterExtension.selfAddress().toString())
253                                                                                 .setStatus(Status.Connected)
254                                                                                 .build()))
255                                                         .build())
256                                         .setHost(netconfNode.getHost())
257                                         .setPort(netconfNode.getPort())
258                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
259                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
260                                         .build()).build();
261                 return currentOperationalNode;
262             }
263         });
264     }
265
266     @Nonnull
267     @Override
268     public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
269                                                 @Nonnull final Node configNode) {
270         // first disconnect this node
271         topologyDispatcher.unregisterMountPoint(nodeId);
272         if (registration != null) {
273             registration.close();
274         }
275         topologyDispatcher.disconnectNode(nodeId);
276
277         // now reinit this connection with new settings
278         final ListenableFuture<NetconfDeviceCapabilities> connectionFuture = topologyDispatcher.connectNode(nodeId, configNode);
279
280         Futures.addCallback(connectionFuture, new FutureCallback<NetconfDeviceCapabilities>() {
281             @Override
282             public void onSuccess(@Nullable NetconfDeviceCapabilities result) {
283                 registration = topologyDispatcher.registerConnectionStatusListener(nodeId, nodeManager);
284             }
285
286             @Override
287             public void onFailure(Throwable t) {
288                 LOG.error("Connection to device failed", t);
289             }
290         });
291
292         final NetconfNode netconfNode = configNode.getAugmentation(NetconfNode.class);
293
294         return Futures.transform(connectionFuture, new Function<NetconfDeviceCapabilities, Node>() {
295             @Nullable
296             @Override
297             public Node apply(NetconfDeviceCapabilities input) {
298                 // build state data
299                 return new NodeBuilder()
300                         .setNodeId(nodeId)
301                         .addAugmentation(NetconfNode.class,
302                                 new NetconfNodeBuilder()
303                                         .setConnectionStatus(ConnectionStatus.Connected)
304                                         .setClusteredConnectionStatus(
305                                                 new ClusteredConnectionStatusBuilder()
306                                                         .setNodeStatus(
307                                                                 Collections.singletonList(
308                                                                         new NodeStatusBuilder()
309                                                                                 .setNode(clusterExtension.selfAddress().toString())
310                                                                                 .setStatus(Status.Connected)
311                                                                                 .build()))
312                                                         .build())
313                                         .setHost(netconfNode.getHost())
314                                         .setPort(netconfNode.getPort())
315                                         .setAvailableCapabilities(new AvailableCapabilitiesBuilder().build())
316                                         .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().build())
317                                         .build())
318                         .build();
319             }
320         });
321     }
322
323     @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
324         // cleanup and disconnect
325         topologyDispatcher.unregisterMountPoint(nodeId);
326         if (registration != null) {
327             registration.close();
328         }
329         roleChangeStrategy.unregisterRoleCandidate();
330         return topologyDispatcher.disconnectNode(nodeId);
331     }
332
333     @Nonnull
334     @Override
335     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
336         LOG.debug("Getting current status for node: {} status: {}", nodeId, currentOperationalNode);
337         return Futures.immediateFuture(currentOperationalNode);
338     }
339
340     @Override
341     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
342         topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
343
344         isMaster = roleChangeDTO.isOwner();
345         if (isMaster) {
346             LOG.warn("Gained ownership of node - registering master mount point");
347             topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId));
348         } else {
349             // even though mount point is ready, we dont know who the master mount point will be since we havent received the announce msg
350             // after we receive the message we can go ahead and register the mount point
351             if (connected && masterDataBrokerRef != null) {
352                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
353             } else {
354                 LOG.debug("Mount point is ready, still waiting for master mount point");
355             }
356         }
357     }
358
359     @Override
360     public void onDeviceConnected(final SchemaContext remoteSchemaContext, final NetconfSessionPreferences netconfSessionPreferences, final DOMRpcService deviceRpc) {
361         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
362         LOG.debug("onDeviceConnected received, registering role candidate");
363         connected = true;
364         roleChangeStrategy.registerRoleCandidate(nodeManager);
365         if (!isMaster && masterDataBrokerRef != null) {
366             // if we're not master but one is present already, we need to register mountpoint
367             LOG.warn("Device connected, master already present in topology, registering mount point");
368             topologyDispatcher.registerMountPoint(cachedContext, new NodeId(nodeId), masterDataBrokerRef);
369         }
370         List<String> capabilityList = new ArrayList<>();
371         capabilityList.addAll(netconfSessionPreferences.getNetconfDeviceCapabilities().getNonModuleBasedCapabilities());
372         capabilityList.addAll(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getResolvedCapabilities()).transform(AVAILABLE_CAPABILITY_TRANSFORMER).toList());
373         final AvailableCapabilitiesBuilder avCapabalitiesBuilder = new AvailableCapabilitiesBuilder();
374         avCapabalitiesBuilder.setAvailableCapability(capabilityList);
375
376         final UnavailableCapabilities unavailableCapabilities =
377                 new UnavailableCapabilitiesBuilder().setUnavailableCapability(FluentIterable.from(netconfSessionPreferences.getNetconfDeviceCapabilities().getUnresolvedCapabilites().entrySet())
378                         .transform(UNAVAILABLE_CAPABILITY_TRANSFORMER).toList()).build();
379
380         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
381         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
382                 .addAugmentation(NetconfNode.class,
383                         new NetconfNodeBuilder()
384                                 .setConnectionStatus(ConnectionStatus.Connected)
385                                 .setClusteredConnectionStatus(
386                                         new ClusteredConnectionStatusBuilder()
387                                                 .setNodeStatus(
388                                                         Collections.singletonList(
389                                                                 new NodeStatusBuilder()
390                                                                         .setNode(clusterExtension.selfAddress().toString())
391                                                                         .setStatus(Status.Connected)
392                                                                         .build()))
393                                                 .build())
394                                 .setHost(netconfNode.getHost())
395                                 .setPort(netconfNode.getPort())
396                                 .setAvailableCapabilities(avCapabalitiesBuilder.build())
397                                 .setUnavailableCapabilities(unavailableCapabilities)
398                                 .build())
399                 .build();
400         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
401     }
402
403     @Override
404     public void onDeviceDisconnected() {
405         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
406         LOG.debug("onDeviceDisconnected received, unregistering role candidate");
407         connected = false;
408         if (isMaster) {
409             // announce that master mount point is going down
410 //            for (final Member member : clusterExtension.state().getMembers()) {
411 //                actorSystem.actorSelection(member.address() + "/user/" + topologyId + "/" + nodeId).tell(new AnnounceMasterMountPointDown(), null);
412 //            }
413             // set master to false since we are unregistering, the ownershipChanged callback can sometimes lag behind causing multiple nodes behaving as masters
414             isMaster = false;
415             // onRoleChanged() callback can sometimes lag behind, so unregister the mount right when it disconnects
416             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
417         }
418         roleChangeStrategy.unregisterRoleCandidate();
419
420         final NetconfNode netconfNode = currentConfig.getAugmentation(NetconfNode.class);
421         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
422                 .addAugmentation(NetconfNode.class,
423                         new NetconfNodeBuilder()
424                                 .setConnectionStatus(ConnectionStatus.Connecting)
425                                 .setClusteredConnectionStatus(
426                                         new ClusteredConnectionStatusBuilder()
427                                                 .setNodeStatus(
428                                                         Collections.singletonList(
429                                                                 new NodeStatusBuilder()
430                                                                         .setNode(clusterExtension.selfAddress().toString())
431                                                                         .setStatus(Status.Unavailable)
432                                                                         .build()))
433                                                 .build())
434                                 .setHost(netconfNode.getHost())
435                                 .setPort(netconfNode.getPort())
436                                 .build()).build();
437         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
438     }
439
440     @Override
441     public void onDeviceFailed(Throwable throwable) {
442         // we need to notify the higher level that something happened, get a current status from all other nodes, and aggregate a new result
443         // no need to remove mountpoint, we should receive onRoleChanged callback after unregistering from election that unregisters the mountpoint
444         LOG.debug("onDeviceFailed received");
445         connected = false;
446         String reason = (throwable != null && throwable.getMessage() != null) ? throwable.getMessage() : UNKNOWN_REASON;
447
448         roleChangeStrategy.unregisterRoleCandidate();
449         currentOperationalNode = new NodeBuilder().setNodeId(new NodeId(nodeId))
450                 .addAugmentation(NetconfNode.class,
451                         new NetconfNodeBuilder()
452                                 .setConnectionStatus(ConnectionStatus.UnableToConnect)
453                                 .setClusteredConnectionStatus(
454                                         new ClusteredConnectionStatusBuilder()
455                                                 .setNodeStatus(
456                                                         Collections.singletonList(
457                                                                 new NodeStatusBuilder()
458                                                                         .setNode(clusterExtension.selfAddress().toString())
459                                                                         .setStatus(Status.Failed)
460                                                                         .build()))
461                                                 .build())
462                                 .setConnectedMessage(reason)
463                                 .build()).build();
464         topologyManager.notifyNodeStatusChange(new NodeId(nodeId));
465     }
466
467     @Override
468     public void onNotification(DOMNotification domNotification) {
469         //NOOP
470     }
471
472     @Override
473     public void close() {
474         //NOOP
475     }
476
477     @Override
478     public void onReceive(Object message, ActorRef actorRef) {
479         LOG.warn("Netconf node callback received message {}", message);
480         if (message instanceof AnnounceMasterMountPoint) {
481             masterDataBrokerRef = actorRef;
482             // candidate gets registered when mount point is already prepared so we can go ahead a register it
483             if (roleChangeStrategy.isCandidateRegistered()) {
484                 topologyDispatcher.registerMountPoint(TypedActor.context(), new NodeId(nodeId), masterDataBrokerRef);
485             } else {
486                 LOG.warn("Announce master mount point msg received but mount point is not ready yet");
487             }
488         } else if (message instanceof AnnounceMasterMountPointDown) {
489             LOG.warn("Master mountpoint went down");
490             masterDataBrokerRef = null;
491             topologyDispatcher.unregisterMountPoint(new NodeId(nodeId));
492         }
493     }
494 }