2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.util;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
40 import java.util.Map.Entry;
41 import java.util.Random;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
63 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
64 import org.opendaylight.yangtools.yang.binding.DataObject;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.concurrent.Future;
72 import scala.concurrent.duration.FiniteDuration;
73 import scala.concurrent.impl.Promise.DefaultPromise;
75 public final class BaseTopologyManager
76 implements TopologyManager {
78 private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
79 private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
81 private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
83 private final ActorSystem system;
84 private final TypedActorExtension typedExtension;
85 private final Cluster clusterExtension;
87 private final BindingNormalizedNodeCodecRegistry codecRegistry;
89 private static final String PATH = "/user/";
91 private final DataBroker dataBroker;
92 private final RoleChangeStrategy roleChangeStrategy;
93 private final StateAggregator aggregator;
95 private final NodeWriter naSalNodeWriter;
96 private final String topologyId;
97 private final TopologyManagerCallback delegateTopologyHandler;
98 private final Set<NodeId> created = new HashSet<>();
100 private final Map<Address, TopologyManager> peers = new HashMap<>();
101 private TopologyManager masterPeer = null;
102 private final int id = new Random().nextInt();
104 private boolean isMaster;
106 public BaseTopologyManager(final ActorSystem system,
107 final BindingNormalizedNodeCodecRegistry codecRegistry,
108 final DataBroker dataBroker,
109 final String topologyId,
110 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
111 final StateAggregator aggregator,
112 final NodeWriter naSalNodeWriter,
113 final RoleChangeStrategy roleChangeStrategy) {
114 this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
117 public BaseTopologyManager(final ActorSystem system,
118 final BindingNormalizedNodeCodecRegistry codecRegistry,
119 final DataBroker dataBroker,
120 final String topologyId,
121 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
122 final StateAggregator aggregator,
123 final NodeWriter naSalNodeWriter,
124 final RoleChangeStrategy roleChangeStrategy,
125 final boolean isMaster) {
127 this.system = system;
128 this.typedExtension = TypedActor.get(system);
129 this.clusterExtension = Cluster.get(system);
130 this.dataBroker = dataBroker;
131 this.topologyId = topologyId;
132 this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
133 this.aggregator = aggregator;
134 this.naSalNodeWriter = naSalNodeWriter;
135 this.roleChangeStrategy = roleChangeStrategy;
136 this.codecRegistry = codecRegistry;
138 // election has not yet happened
139 this.isMaster = isMaster;
141 this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
143 LOG.debug("Base manager started ", +id);
147 public void preStart() {
148 LOG.debug("preStart called");
149 // TODO change to enum, master/slave active/standby
150 roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
151 LOG.debug("candidate registered");
152 clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
156 public void postStop() {
157 LOG.debug("postStop called");
158 clusterExtension.leave(clusterExtension.selfAddress());
159 clusterExtension.unsubscribe(TypedActor.context().self());
163 public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
164 LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
166 if (created.contains(nodeId)) {
167 LOG.warn("Node{} already exists, triggering update..", nodeId);
168 return onNodeUpdated(nodeId, node);
171 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
175 futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
176 // only master should call connect on peers and aggregate futures
177 for (TopologyManager topologyManager : peers.values()) {
178 // convert binding into NormalizedNode for transfer
179 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
181 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
182 LOG.debug("Value {}", normalizedNodeEntry.getValue());
184 // add a future into our futures that gets its completion status from the converted scala future
185 final SettableFuture<Node> settableFuture = SettableFuture.create();
186 futures.add(settableFuture);
187 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
188 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
190 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
191 if (failure != null) {
192 settableFuture.setException(failure);
195 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
196 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
197 final Node value = (Node) fromNormalizedNode.getValue();
199 settableFuture.set(value);
201 }, TypedActor.context().dispatcher());
204 final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
205 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
207 public void onSuccess(final Node result) {
208 LOG.debug("Futures aggregated succesfully");
209 naSalNodeWriter.init(nodeId, result);
213 public void onFailure(final Throwable t) {
214 // If the combined connection attempt failed, set the node to connection failed
215 LOG.debug("Futures aggregation failed");
216 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
217 // FIXME disconnect those which succeeded
218 // just issue a delete on delegateTopologyHandler that gets handled on lower level
220 }, TypedActor.context().dispatcher());
222 //combine peer futures
223 return aggregatedFuture;
226 // trigger create on this slave
227 return delegateTopologyHandler.onNodeCreated(nodeId, node);
231 public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
232 LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
234 // Master needs to trigger onNodeUpdated on peers and combine results
236 // first cleanup old node
237 final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
238 final SettableFuture<Node> createFuture = SettableFuture.create();
239 final TopologyManager selfProxy = TypedActor.self();
240 final ActorContext context = TypedActor.context();
241 Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
243 public void onSuccess(Void result) {
244 LOG.warn("Delete part of update succesfull, triggering create");
245 // trigger create on all nodes
246 Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
248 public void onSuccess(Node result) {
249 createFuture.set(result);
253 public void onFailure(Throwable t) {
254 createFuture.setException(t);
256 }, context.dispatcher());
260 public void onFailure(Throwable t) {
261 LOG.warn("Delete part of update failed, {}", t);
263 }, context.dispatcher());
267 // Trigger update on this slave
268 return delegateTopologyHandler.onNodeUpdated(nodeId, node);
271 private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
272 final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
273 return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
277 public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
278 final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
279 created.remove(nodeId);
281 // Master needs to trigger delete on peers and combine results
283 futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
284 for (TopologyManager topologyManager : peers.values()) {
285 // add a future into our futures that gets its completion status from the converted scala future
286 final SettableFuture<Void> settableFuture = SettableFuture.create();
287 futures.add(settableFuture);
288 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
289 scalaFuture.onComplete(new OnComplete<Void>() {
291 public void onComplete(Throwable failure, Void success) throws Throwable {
292 if (failure != null) {
293 settableFuture.setException(failure);
297 settableFuture.set(success);
299 }, TypedActor.context().dispatcher());
302 final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
303 Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
305 public void onSuccess(final Void result) {
306 naSalNodeWriter.delete(nodeId);
310 public void onFailure(final Throwable t) {
311 // FIXME unable to disconnect all the connections, what do we do now ?
315 return aggregatedFuture;
319 return delegateTopologyHandler.onNodeDeleted(nodeId);
324 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
325 return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
329 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
330 isMaster = roleChangeDTO.isOwner();
331 delegateTopologyHandler.onRoleChanged(roleChangeDTO);
333 LOG.debug("Node {} is master now", clusterExtension.selfAddress());
334 clusterExtension.join(clusterExtension.selfAddress());
339 public Future<Boolean> isMaster() {
340 return new DefaultPromise<Boolean>().success(isMaster).future();
344 public void notifyNodeStatusChange(final NodeId nodeId) {
345 LOG.debug("Connection status has changed on node {}", nodeId.getValue());
347 // grab status from all peers and aggregate
348 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
349 futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
350 // only master should call connect on peers and aggregate futures
351 for (TopologyManager topologyManager : peers.values()) {
352 // add a future into our futures that gets its completion status from the converted scala future
353 final SettableFuture<Node> settableFuture = SettableFuture.create();
354 futures.add(settableFuture);
355 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
356 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
358 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
359 if (failure != null) {
360 settableFuture.setException(failure);
363 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
364 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
365 final Node value = (Node) fromNormalizedNode.getValue();
367 settableFuture.set(value);
369 }, TypedActor.context().dispatcher());
372 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
373 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
375 public void onSuccess(final Node result) {
376 LOG.debug("Futures aggregated succesfully");
377 naSalNodeWriter.update(nodeId, result);
381 public void onFailure(final Throwable t) {
382 // If the combined connection attempt failed, set the node to connection failed
383 LOG.debug("Futures aggregation failed");
384 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
385 // FIXME disconnect those which succeeded
386 // just issue a delete on delegateTopologyHandler that gets handled on lower level
391 LOG.debug("Not master, forwarding..");
392 for (final TopologyManager manager : peers.values()) {
393 // asynchronously find out which peer is master
394 final Future<Boolean> future = manager.isMaster();
395 future.onComplete(new OnComplete<Boolean>() {
397 public void onComplete(Throwable failure, Boolean success) throws Throwable {
398 if (failure == null && success) {
399 LOG.debug("Found master peer");
401 manager.notifyNodeStatusChange(nodeId);
404 if (failure != null) {
405 LOG.debug("Retrieving master peer failed, {}", failure);
408 }, TypedActor.context().dispatcher());
413 public boolean hasAllPeersUp() {
414 LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
415 LOG.warn(clusterExtension.state().toString());
416 LOG.warn(peers.toString());
417 return peers.size() == 2;
421 public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
422 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
423 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
424 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
425 final Node value = (Node) fromNormalizedNode.getValue();
427 LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
428 final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
429 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
430 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
432 public void onSuccess(Node result) {
433 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
434 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
438 public void onFailure(Throwable t) {
443 return promise.future();
447 public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
448 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
449 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
450 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
451 final Node value = (Node) fromNormalizedNode.getValue();
453 LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
455 final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
456 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
457 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
459 public void onSuccess(Node result) {
460 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
461 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
465 public void onFailure(Throwable t) {
469 return promise.future();
473 public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
474 LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
476 final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
477 final DefaultPromise<Void> promise = new DefaultPromise<>();
478 Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
480 public void onSuccess(Void result) {
481 promise.success(null);
485 public void onFailure(Throwable t) {
490 return promise.future();
493 public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
494 LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
496 final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
497 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
498 Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
500 public void onSuccess(Node result) {
501 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
502 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
506 public void onFailure(Throwable t) {
510 return promise.future();
514 public void onReceive(final Object message, final ActorRef actorRef) {
515 LOG.debug("message received {}", message);
516 if (message instanceof MemberUp) {
517 final Member member = ((MemberUp) message).member();
518 LOG.info("Member is Up: {}", member);
519 if (member.address().equals(clusterExtension.selfAddress())) {
523 final String path = member.address() + PATH + topologyId;
524 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
526 // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
527 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
528 } else if (message instanceof MemberExited) {
530 final Member member = ((MemberExited) message).member();
531 LOG.info("Member exited cluster: {}", member);
532 peers.remove(member.address());
533 } else if (message instanceof MemberRemoved) {
535 final Member member = ((MemberRemoved) message).member();
536 LOG.info("Member was removed from cluster: {}", member);
537 peers.remove(member.address());
538 } else if (message instanceof UnreachableMember) {
540 final Member member = ((UnreachableMember) message).member();
541 LOG.info("Member is unreachable: {}", member);
542 peers.remove(member.address());
543 } else if (message instanceof ReachableMember) {
545 final Member member = ((ReachableMember) message).member();
546 LOG.info("Member is reachable again: {}", member);
548 if (member.address().equals(clusterExtension.selfAddress())) {
552 final String path = member.address() + PATH + topologyId;
553 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
555 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
556 } else if (message instanceof ActorIdentity) {
557 LOG.debug("Received ActorIdentity message", message);
558 final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
559 if (((ActorIdentity) message).getRef() == null) {
560 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
561 final ActorRef self = TypedActor.context().self();
562 final ActorContext context = TypedActor.context();
563 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
566 LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
567 context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
570 }, system.dispatcher());
573 LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
575 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
576 } else if (message instanceof CustomIdentifyMessageReply) {
578 LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
579 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
580 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
581 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
586 } else if (message instanceof CustomIdentifyMessage) {
587 LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
588 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
589 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
590 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
595 actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
599 private void resyncPeer(final TopologyManager peer) {
600 final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
601 final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
603 Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
605 public void onSuccess(Optional<Topology> result) {
606 if (result.isPresent()) {
607 for (final Node node : result.get().getNode()) {
608 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
609 peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
610 // we dont care about the future from now on since we will be notified by the onConnected event
616 public void onFailure(Throwable t) {
617 LOG.error("Unable to read from datastore");