2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.util;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
40 import java.util.Map.Entry;
41 import java.util.Random;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71 import scala.concurrent.impl.Promise.DefaultPromise;
73 public final class BaseTopologyManager
74 implements TopologyManager {
76 private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
78 private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
80 private final ActorSystem system;
81 private final TypedActorExtension typedExtension;
82 private final Cluster clusterExtension;
84 private final BindingNormalizedNodeCodecRegistry codecRegistry;
86 private static final String PATH = "/user/";
88 private final DataBroker dataBroker;
89 private final RoleChangeStrategy roleChangeStrategy;
90 private final StateAggregator aggregator;
92 private final NodeWriter naSalNodeWriter;
93 private final String topologyId;
94 private final TopologyManagerCallback delegateTopologyHandler;
95 private final Set<NodeId> created = new HashSet<>();
97 private final Map<Address, TopologyManager> peers = new HashMap<>();
98 private final int id = new Random().nextInt();
100 private boolean isMaster;
102 public BaseTopologyManager(final ActorSystem system,
103 final BindingNormalizedNodeCodecRegistry codecRegistry,
104 final DataBroker dataBroker,
105 final String topologyId,
106 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
107 final StateAggregator aggregator,
108 final NodeWriter naSalNodeWriter,
109 final RoleChangeStrategy roleChangeStrategy) {
110 this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
113 public BaseTopologyManager(final ActorSystem system,
114 final BindingNormalizedNodeCodecRegistry codecRegistry,
115 final DataBroker dataBroker,
116 final String topologyId,
117 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
118 final StateAggregator aggregator,
119 final NodeWriter naSalNodeWriter,
120 final RoleChangeStrategy roleChangeStrategy,
121 final boolean isMaster) {
123 this.system = system;
124 this.typedExtension = TypedActor.get(system);
125 this.clusterExtension = Cluster.get(system);
126 this.dataBroker = dataBroker;
127 this.topologyId = topologyId;
128 this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
129 this.aggregator = aggregator;
130 this.naSalNodeWriter = naSalNodeWriter;
131 this.roleChangeStrategy = roleChangeStrategy;
132 this.codecRegistry = codecRegistry;
134 // election has not yet happened
135 this.isMaster = isMaster;
137 this.topologyListPath = TopologyUtil.createTopologyListPath(topologyId);
139 LOG.debug("Base manager started ", +id);
143 public void preStart() {
144 LOG.debug("preStart called");
145 // TODO change to enum, master/slave active/standby
146 roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
147 LOG.debug("candidate registered");
148 clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
152 public void postStop() {
153 LOG.debug("postStop called");
154 clusterExtension.leave(clusterExtension.selfAddress());
155 clusterExtension.unsubscribe(TypedActor.context().self());
159 public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
160 LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
162 if (created.contains(nodeId)) {
163 LOG.warn("Node{} already exists, triggering update..", nodeId);
164 return onNodeUpdated(nodeId, node);
167 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
171 futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
172 // only master should call connect on peers and aggregate futures
173 for (TopologyManager topologyManager : peers.values()) {
174 // convert binding into NormalizedNode for transfer
175 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
177 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
178 LOG.debug("Value {}", normalizedNodeEntry.getValue());
180 // add a future into our futures that gets its completion status from the converted scala future
181 final SettableFuture<Node> settableFuture = SettableFuture.create();
182 futures.add(settableFuture);
183 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
184 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
186 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
187 if (failure != null) {
188 settableFuture.setException(failure);
191 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
192 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
193 final Node value = (Node) fromNormalizedNode.getValue();
195 settableFuture.set(value);
197 }, TypedActor.context().dispatcher());
200 final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
201 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
203 public void onSuccess(final Node result) {
204 LOG.debug("Futures aggregated succesfully");
205 naSalNodeWriter.init(nodeId, result);
209 public void onFailure(final Throwable t) {
210 // If the combined connection attempt failed, set the node to connection failed
211 LOG.debug("Futures aggregation failed");
212 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
213 // FIXME disconnect those which succeeded
214 // just issue a delete on delegateTopologyHandler that gets handled on lower level
216 }, TypedActor.context().dispatcher());
218 //combine peer futures
219 return aggregatedFuture;
222 // trigger create on this slave
223 return delegateTopologyHandler.onNodeCreated(nodeId, node);
227 public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
228 LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
230 // Master needs to trigger onNodeUpdated on peers and combine results
232 // first cleanup old node
233 final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
234 final SettableFuture<Node> createFuture = SettableFuture.create();
235 final TopologyManager selfProxy = TypedActor.self();
236 final ActorContext context = TypedActor.context();
237 Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
239 public void onSuccess(Void result) {
240 LOG.warn("Delete part of update succesfull, triggering create");
241 // trigger create on all nodes
242 Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
244 public void onSuccess(Node result) {
245 createFuture.set(result);
249 public void onFailure(Throwable t) {
250 createFuture.setException(t);
252 }, context.dispatcher());
256 public void onFailure(Throwable t) {
257 LOG.warn("Delete part of update failed, {}", t);
259 }, context.dispatcher());
263 // Trigger update on this slave
264 return delegateTopologyHandler.onNodeUpdated(nodeId, node);
268 public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
269 final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
270 created.remove(nodeId);
272 // Master needs to trigger delete on peers and combine results
274 futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
275 for (TopologyManager topologyManager : peers.values()) {
276 // add a future into our futures that gets its completion status from the converted scala future
277 final SettableFuture<Void> settableFuture = SettableFuture.create();
278 futures.add(settableFuture);
279 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
280 scalaFuture.onComplete(new OnComplete<Void>() {
282 public void onComplete(Throwable failure, Void success) throws Throwable {
283 if (failure != null) {
284 settableFuture.setException(failure);
288 settableFuture.set(success);
290 }, TypedActor.context().dispatcher());
293 final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
294 Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
296 public void onSuccess(final Void result) {
297 naSalNodeWriter.delete(nodeId);
301 public void onFailure(final Throwable t) {
302 // FIXME unable to disconnect all the connections, what do we do now ?
306 return aggregatedFuture;
310 return delegateTopologyHandler.onNodeDeleted(nodeId);
315 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
316 return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
320 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
321 isMaster = roleChangeDTO.isOwner();
322 delegateTopologyHandler.onRoleChanged(roleChangeDTO);
324 LOG.debug("Node {} is master now", clusterExtension.selfAddress());
325 clusterExtension.join(clusterExtension.selfAddress());
330 public Future<Boolean> isMaster() {
331 return new DefaultPromise<Boolean>().success(isMaster).future();
335 public void notifyNodeStatusChange(final NodeId nodeId) {
336 LOG.debug("Connection status has changed on node {}", nodeId.getValue());
338 // grab status from all peers and aggregate
339 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
340 futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
341 // only master should call connect on peers and aggregate futures
342 for (TopologyManager topologyManager : peers.values()) {
343 // add a future into our futures that gets its completion status from the converted scala future
344 final SettableFuture<Node> settableFuture = SettableFuture.create();
345 futures.add(settableFuture);
346 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
347 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
349 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
350 if (failure != null) {
351 settableFuture.setException(failure);
354 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
355 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
356 final Node value = (Node) fromNormalizedNode.getValue();
358 settableFuture.set(value);
360 }, TypedActor.context().dispatcher());
363 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
364 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
366 public void onSuccess(final Node result) {
367 LOG.debug("Futures aggregated succesfully");
368 naSalNodeWriter.update(nodeId, result);
372 public void onFailure(final Throwable t) {
373 // If the combined connection attempt failed, set the node to connection failed
374 LOG.debug("Futures aggregation failed");
375 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
376 // FIXME disconnect those which succeeded
377 // just issue a delete on delegateTopologyHandler that gets handled on lower level
382 LOG.debug("Not master, forwarding..");
383 for (final TopologyManager manager : peers.values()) {
384 // asynchronously find out which peer is master
385 final Future<Boolean> future = manager.isMaster();
386 future.onComplete(new OnComplete<Boolean>() {
388 public void onComplete(Throwable failure, Boolean success) throws Throwable {
389 if (failure == null && success) {
390 LOG.debug("Found master peer");
392 manager.notifyNodeStatusChange(nodeId);
395 if (failure != null) {
396 LOG.debug("Retrieving master peer failed, {}", failure);
399 }, TypedActor.context().dispatcher());
404 public boolean hasAllPeersUp() {
405 LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
406 LOG.warn(clusterExtension.state().toString());
407 LOG.warn(peers.toString());
408 return peers.size() == 2;
412 public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
413 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
414 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
415 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
416 final Node value = (Node) fromNormalizedNode.getValue();
418 LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
419 final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
420 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
421 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
423 public void onSuccess(Node result) {
424 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
425 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
429 public void onFailure(Throwable t) {
434 return promise.future();
438 public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
439 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
440 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
441 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
442 final Node value = (Node) fromNormalizedNode.getValue();
444 LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
446 final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
447 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
448 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
450 public void onSuccess(Node result) {
451 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
452 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
456 public void onFailure(Throwable t) {
460 return promise.future();
464 public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
465 LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
467 final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
468 final DefaultPromise<Void> promise = new DefaultPromise<>();
469 Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
471 public void onSuccess(Void result) {
472 promise.success(null);
476 public void onFailure(Throwable t) {
481 return promise.future();
484 public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
485 LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
487 final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
488 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
489 Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
491 public void onSuccess(Node result) {
492 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), result);
493 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
497 public void onFailure(Throwable t) {
501 return promise.future();
505 public void onReceive(final Object message, final ActorRef actorRef) {
506 LOG.debug("message received {}", message);
507 if (message instanceof MemberUp) {
508 final Member member = ((MemberUp) message).member();
509 LOG.info("Member is Up: {}", member);
510 if (member.address().equals(clusterExtension.selfAddress())) {
514 final String path = member.address() + PATH + topologyId;
515 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
517 // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
518 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
519 } else if (message instanceof MemberExited) {
521 final Member member = ((MemberExited) message).member();
522 LOG.info("Member exited cluster: {}", member);
523 peers.remove(member.address());
524 } else if (message instanceof MemberRemoved) {
526 final Member member = ((MemberRemoved) message).member();
527 LOG.info("Member was removed from cluster: {}", member);
528 peers.remove(member.address());
529 } else if (message instanceof UnreachableMember) {
531 final Member member = ((UnreachableMember) message).member();
532 LOG.info("Member is unreachable: {}", member);
533 peers.remove(member.address());
534 } else if (message instanceof ReachableMember) {
536 final Member member = ((ReachableMember) message).member();
537 LOG.info("Member is reachable again: {}", member);
539 if (member.address().equals(clusterExtension.selfAddress())) {
543 final String path = member.address() + PATH + topologyId;
544 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
546 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
547 } else if (message instanceof ActorIdentity) {
548 LOG.debug("Received ActorIdentity message", message);
549 final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
550 if (((ActorIdentity) message).getRef() == null) {
551 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
552 final ActorRef self = TypedActor.context().self();
553 final ActorContext context = TypedActor.context();
554 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
557 LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
558 context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
561 }, system.dispatcher());
564 LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
566 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
567 } else if (message instanceof CustomIdentifyMessageReply) {
569 LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
570 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
571 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
572 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
577 } else if (message instanceof CustomIdentifyMessage) {
578 LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
579 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
580 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
581 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
586 actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
590 private void resyncPeer(final TopologyManager peer) {
591 final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
592 final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
594 Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
596 public void onSuccess(Optional<Topology> result) {
597 if (result.isPresent() && result.get().getNode() != null) {
598 for (final Node node : result.get().getNode()) {
599 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
600 peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
601 // we dont care about the future from now on since we will be notified by the onConnected event
607 public void onFailure(Throwable t) {
608 LOG.error("Unable to read from datastore");