2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.util;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
40 import java.util.Map.Entry;
41 import java.util.Random;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.yang.binding.DataObject;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
72 import scala.concurrent.impl.Promise.DefaultPromise;
74 public final class BaseTopologyManager
75 implements TopologyManager {
77 private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
79 private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
81 private final ActorSystem system;
82 private final TypedActorExtension typedExtension;
83 private final Cluster clusterExtension;
85 private final BindingNormalizedNodeCodecRegistry codecRegistry;
87 private static final String PATH = "/user/";
89 private final DataBroker dataBroker;
90 private final RoleChangeStrategy roleChangeStrategy;
91 private final StateAggregator aggregator;
93 private final NodeWriter naSalNodeWriter;
94 private final String topologyId;
95 private final TopologyManagerCallback delegateTopologyHandler;
96 private final Set<NodeId> created = new HashSet<>();
98 private final Map<Address, TopologyManager> peers = new HashMap<>();
99 private final int id = new Random().nextInt();
101 private boolean isMaster;
103 public BaseTopologyManager(final ActorSystem system,
104 final BindingNormalizedNodeCodecRegistry codecRegistry,
105 final DataBroker dataBroker,
106 final String topologyId,
107 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
108 final StateAggregator aggregator,
109 final NodeWriter naSalNodeWriter,
110 final RoleChangeStrategy roleChangeStrategy) {
111 this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
114 public BaseTopologyManager(final ActorSystem system,
115 final BindingNormalizedNodeCodecRegistry codecRegistry,
116 final DataBroker dataBroker,
117 final String topologyId,
118 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
119 final StateAggregator aggregator,
120 final NodeWriter naSalNodeWriter,
121 final RoleChangeStrategy roleChangeStrategy,
122 final boolean isMaster) {
124 this.system = system;
125 this.typedExtension = TypedActor.get(system);
126 this.clusterExtension = Cluster.get(system);
127 this.dataBroker = dataBroker;
128 this.topologyId = topologyId;
129 this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
130 this.aggregator = aggregator;
131 this.naSalNodeWriter = naSalNodeWriter;
132 this.roleChangeStrategy = roleChangeStrategy;
133 this.codecRegistry = codecRegistry;
135 // election has not yet happened
136 this.isMaster = isMaster;
138 this.topologyListPath = TopologyUtil.createTopologyListPath(topologyId);
140 LOG.debug("Base manager started ", +id);
144 public void preStart() {
145 LOG.debug("preStart called");
146 // TODO change to enum, master/slave active/standby
147 roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
148 LOG.debug("candidate registered");
149 clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
153 public void postStop() {
154 LOG.debug("postStop called");
155 clusterExtension.leave(clusterExtension.selfAddress());
156 clusterExtension.unsubscribe(TypedActor.context().self());
160 public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
161 LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
163 if (created.contains(nodeId)) {
164 LOG.warn("Node{} already exists, triggering update..", nodeId);
165 return onNodeUpdated(nodeId, node);
168 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
172 futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
173 // only master should call connect on peers and aggregate futures
174 for (TopologyManager topologyManager : peers.values()) {
175 // convert binding into NormalizedNode for transfer
176 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
178 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
179 LOG.debug("Value {}", normalizedNodeEntry.getValue());
181 // add a future into our futures that gets its completion status from the converted scala future
182 final SettableFuture<Node> settableFuture = SettableFuture.create();
183 futures.add(settableFuture);
184 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
185 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
187 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
188 if (failure != null) {
189 settableFuture.setException(failure);
192 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
193 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
194 final Node value = (Node) fromNormalizedNode.getValue();
196 settableFuture.set(value);
198 }, TypedActor.context().dispatcher());
201 final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
202 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
204 public void onSuccess(final Node result) {
205 LOG.debug("Futures aggregated succesfully");
206 naSalNodeWriter.init(nodeId, result);
210 public void onFailure(final Throwable t) {
211 // If the combined connection attempt failed, set the node to connection failed
212 LOG.debug("Futures aggregation failed");
213 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
215 }, TypedActor.context().dispatcher());
217 //combine peer futures
218 return aggregatedFuture;
221 // trigger create on this slave
222 return delegateTopologyHandler.onNodeCreated(nodeId, node);
226 public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
227 LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
229 // Master needs to trigger onNodeUpdated on peers and combine results
231 // first cleanup old node
232 final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
233 final SettableFuture<Node> createFuture = SettableFuture.create();
234 final TopologyManager selfProxy = TypedActor.self();
235 final ActorContext context = TypedActor.context();
236 Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
238 public void onSuccess(Void result) {
239 LOG.warn("Delete part of update succesfull, triggering create");
240 // trigger create on all nodes
241 Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
243 public void onSuccess(Node result) {
244 createFuture.set(result);
248 public void onFailure(Throwable t) {
249 createFuture.setException(t);
251 }, context.dispatcher());
255 public void onFailure(Throwable t) {
256 LOG.warn("Delete part of update failed, {}", t);
258 }, context.dispatcher());
262 // Trigger update on this slave
263 return delegateTopologyHandler.onNodeUpdated(nodeId, node);
267 public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
268 final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
269 created.remove(nodeId);
271 // Master needs to trigger delete on peers and combine results
273 futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
274 for (TopologyManager topologyManager : peers.values()) {
275 // add a future into our futures that gets its completion status from the converted scala future
276 final SettableFuture<Void> settableFuture = SettableFuture.create();
277 futures.add(settableFuture);
278 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
279 scalaFuture.onComplete(new OnComplete<Void>() {
281 public void onComplete(Throwable failure, Void success) throws Throwable {
282 if (failure != null) {
283 settableFuture.setException(failure);
287 settableFuture.set(success);
289 }, TypedActor.context().dispatcher());
292 final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
293 Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
295 public void onSuccess(final Void result) {
296 naSalNodeWriter.delete(nodeId);
300 public void onFailure(final Throwable t) {
305 return aggregatedFuture;
309 return delegateTopologyHandler.onNodeDeleted(nodeId);
314 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
315 return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
319 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
320 isMaster = roleChangeDTO.isOwner();
321 delegateTopologyHandler.onRoleChanged(roleChangeDTO);
323 LOG.debug("Node {} is master now", clusterExtension.selfAddress());
324 clusterExtension.join(clusterExtension.selfAddress());
329 public Future<Boolean> isMaster() {
330 return new DefaultPromise<Boolean>().success(isMaster).future();
334 public void notifyNodeStatusChange(final NodeId nodeId) {
335 LOG.debug("Connection status has changed on node {}", nodeId.getValue());
337 // grab status from all peers and aggregate
338 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
339 futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
340 // only master should call connect on peers and aggregate futures
341 for (TopologyManager topologyManager : peers.values()) {
342 // add a future into our futures that gets its completion status from the converted scala future
343 final SettableFuture<Node> settableFuture = SettableFuture.create();
344 futures.add(settableFuture);
345 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
346 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
348 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
349 if (failure != null) {
350 settableFuture.setException(failure);
353 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
354 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
355 final Node value = (Node) fromNormalizedNode.getValue();
357 settableFuture.set(value);
359 }, TypedActor.context().dispatcher());
362 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
363 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
365 public void onSuccess(final Node result) {
366 LOG.debug("Futures aggregated succesfully");
367 naSalNodeWriter.update(nodeId, result);
371 public void onFailure(final Throwable t) {
372 // If the combined connection attempt failed, set the node to connection failed
373 LOG.debug("Futures aggregation failed");
374 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
379 LOG.debug("Not master, forwarding..");
380 for (final TopologyManager manager : peers.values()) {
381 // asynchronously find out which peer is master
382 final Future<Boolean> future = manager.isMaster();
383 future.onComplete(new OnComplete<Boolean>() {
385 public void onComplete(Throwable failure, Boolean success) throws Throwable {
386 if (failure == null && success) {
387 LOG.debug("Found master peer");
389 manager.notifyNodeStatusChange(nodeId);
392 if (failure != null) {
393 LOG.debug("Retrieving master peer failed, {}", failure);
396 }, TypedActor.context().dispatcher());
401 public boolean hasAllPeersUp() {
402 LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
403 LOG.warn(clusterExtension.state().toString());
404 LOG.warn(peers.toString());
405 return peers.size() == 2;
409 public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
410 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
411 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
412 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
413 final Node value = (Node) fromNormalizedNode.getValue();
415 LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
416 final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
417 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
418 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
420 public void onSuccess(Node result) {
421 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
422 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
426 public void onFailure(Throwable t) {
431 return promise.future();
435 public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
436 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
437 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
438 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
439 final Node value = (Node) fromNormalizedNode.getValue();
441 LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
443 final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
444 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
445 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
447 public void onSuccess(Node result) {
448 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
449 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
453 public void onFailure(Throwable t) {
457 return promise.future();
461 public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
462 LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
464 final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
465 final DefaultPromise<Void> promise = new DefaultPromise<>();
466 Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
468 public void onSuccess(Void result) {
469 promise.success(null);
473 public void onFailure(Throwable t) {
478 return promise.future();
481 public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
482 LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
484 final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
485 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
486 Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
488 public void onSuccess(Node result) {
489 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), result);
490 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
494 public void onFailure(Throwable t) {
498 return promise.future();
502 public void onReceive(final Object message, final ActorRef actorRef) {
503 LOG.debug("message received {}", message);
504 if (message instanceof MemberUp) {
505 final Member member = ((MemberUp) message).member();
506 LOG.info("Member is Up: {}", member);
507 if (member.address().equals(clusterExtension.selfAddress())) {
511 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(member.address().toString(), topologyId);
512 final String path = pathCreator.build();
513 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
515 // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
516 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
517 } else if (message instanceof MemberExited) {
519 final Member member = ((MemberExited) message).member();
520 LOG.info("Member exited cluster: {}", member);
521 peers.remove(member.address());
522 } else if (message instanceof MemberRemoved) {
524 final Member member = ((MemberRemoved) message).member();
525 LOG.info("Member was removed from cluster: {}", member);
526 peers.remove(member.address());
527 } else if (message instanceof UnreachableMember) {
529 final Member member = ((UnreachableMember) message).member();
530 LOG.info("Member is unreachable: {}", member);
531 peers.remove(member.address());
532 } else if (message instanceof ReachableMember) {
534 final Member member = ((ReachableMember) message).member();
535 LOG.info("Member is reachable again: {}", member);
537 if (member.address().equals(clusterExtension.selfAddress())) {
540 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(member.address().toString(), topologyId);
541 final String path = pathCreator.build();
542 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
544 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
545 } else if (message instanceof ActorIdentity) {
546 LOG.debug("Received ActorIdentity message", message);
547 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(((ActorIdentity) message).correlationId().toString(), topologyId);
548 final String path = pathCreator.build();
549 if (((ActorIdentity) message).getRef() == null) {
550 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
551 final ActorRef self = TypedActor.context().self();
552 final ActorContext context = TypedActor.context();
553 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
556 LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
557 context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
560 }, system.dispatcher());
563 LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
565 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
566 } else if (message instanceof CustomIdentifyMessageReply) {
568 LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
569 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
570 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
571 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
576 } else if (message instanceof CustomIdentifyMessage) {
577 LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
578 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
579 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
580 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
585 actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
589 private void resyncPeer(final TopologyManager peer) {
590 final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
591 final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
593 Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
595 public void onSuccess(Optional<Topology> result) {
596 if (result.isPresent() && result.get().getNode() != null) {
597 for (final Node node : result.get().getNode()) {
598 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
599 peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
600 // we dont care about the future from now on since we will be notified by the onConnected event
606 public void onFailure(Throwable t) {
607 LOG.error("Unable to read from datastore");