2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.util;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
39 import java.util.Map.Entry;
40 import java.util.Random;
41 import java.util.concurrent.TimeUnit;
42 import javax.annotation.Nonnull;
43 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
44 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
45 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
46 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
47 import org.opendaylight.netconf.topology.RoleChangeStrategy;
48 import org.opendaylight.netconf.topology.StateAggregator;
49 import org.opendaylight.netconf.topology.TopologyManager;
50 import org.opendaylight.netconf.topology.TopologyManagerCallback;
51 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
52 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
53 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
54 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71 import scala.concurrent.impl.Promise.DefaultPromise;
73 public final class BaseTopologyManager
74 implements TopologyManager {
76 private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
77 private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
79 private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
81 private final ActorSystem system;
82 private final TypedActorExtension typedExtension;
83 private final Cluster clusterExtension;
85 private final BindingNormalizedNodeCodecRegistry codecRegistry;
87 private static final String PATH = "/user/";
89 private final DataBroker dataBroker;
90 private final RoleChangeStrategy roleChangeStrategy;
91 private final StateAggregator aggregator;
93 private final NodeWriter naSalNodeWriter;
94 private final String topologyId;
95 private final TopologyManagerCallback delegateTopologyHandler;
97 private final Map<Address, TopologyManager> peers = new HashMap<>();
98 private TopologyManager masterPeer = null;
99 private final int id = new Random().nextInt();
101 private boolean isMaster;
103 public BaseTopologyManager(final ActorSystem system,
104 final BindingNormalizedNodeCodecRegistry codecRegistry,
105 final DataBroker dataBroker,
106 final String topologyId,
107 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
108 final StateAggregator aggregator,
109 final NodeWriter naSalNodeWriter,
110 final RoleChangeStrategy roleChangeStrategy) {
111 this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
114 public BaseTopologyManager(final ActorSystem system,
115 final BindingNormalizedNodeCodecRegistry codecRegistry,
116 final DataBroker dataBroker,
117 final String topologyId,
118 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
119 final StateAggregator aggregator,
120 final NodeWriter naSalNodeWriter,
121 final RoleChangeStrategy roleChangeStrategy,
122 final boolean isMaster) {
124 this.system = system;
125 this.typedExtension = TypedActor.get(system);
126 this.clusterExtension = Cluster.get(system);
127 this.dataBroker = dataBroker;
128 this.topologyId = topologyId;
129 this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
130 this.aggregator = aggregator;
131 this.naSalNodeWriter = naSalNodeWriter;
132 this.roleChangeStrategy = roleChangeStrategy;
133 this.codecRegistry = codecRegistry;
135 // election has not yet happened
136 this.isMaster = isMaster;
138 this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
140 LOG.debug("Base manager started ", +id);
144 public void preStart() {
145 LOG.debug("preStart called");
146 // TODO change to enum, master/slave active/standby
147 roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
148 LOG.debug("candidate registered");
149 clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
153 public void postStop() {
154 LOG.debug("postStop called");
155 clusterExtension.leave(clusterExtension.selfAddress());
156 clusterExtension.unsubscribe(TypedActor.context().self());
160 public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
161 LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
163 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
167 futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
168 // only master should call connect on peers and aggregate futures
169 for (TopologyManager topologyManager : peers.values()) {
170 // convert binding into NormalizedNode for transfer
171 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
173 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
174 LOG.debug("Value {}", normalizedNodeEntry.getValue());
176 // add a future into our futures that gets its completion status from the converted scala future
177 final SettableFuture<Node> settableFuture = SettableFuture.create();
178 futures.add(settableFuture);
179 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
180 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
182 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
183 if (failure != null) {
184 settableFuture.setException(failure);
187 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
188 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
189 final Node value = (Node) fromNormalizedNode.getValue();
191 settableFuture.set(value);
193 }, TypedActor.context().dispatcher());
196 final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
197 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
199 public void onSuccess(final Node result) {
200 LOG.debug("Futures aggregated succesfully");
201 naSalNodeWriter.init(nodeId, result);
205 public void onFailure(final Throwable t) {
206 // If the combined connection attempt failed, set the node to connection failed
207 LOG.debug("Futures aggregation failed");
208 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
209 // FIXME disconnect those which succeeded
210 // just issue a delete on delegateTopologyHandler that gets handled on lower level
212 }, TypedActor.context().dispatcher());
214 //combine peer futures
215 return aggregatedFuture;
218 // trigger create on this slave
219 return delegateTopologyHandler.onNodeCreated(nodeId, node);
223 public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
224 LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
226 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
228 // Master needs to trigger onNodeUpdated on peers and combine results
230 futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
231 for (TopologyManager topologyManager : peers.values()) {
232 // convert binding into NormalizedNode for transfer
233 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
235 // add a future into our futures that gets its completion status from the converted scala future
236 final SettableFuture<Node> settableFuture = SettableFuture.create();
237 futures.add(settableFuture);
238 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
239 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
241 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
242 if (failure != null) {
243 settableFuture.setException(failure);
246 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
247 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
248 final Node value = (Node) fromNormalizedNode.getValue();
250 settableFuture.set(value);
252 }, TypedActor.context().dispatcher());
255 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
256 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
258 public void onSuccess(final Node result) {
259 // FIXME make this (writing state data for nodes) optional and customizable
260 // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
261 naSalNodeWriter.update(nodeId, result);
265 public void onFailure(final Throwable t) {
266 // If the combined connection attempt failed, set the node to connection failed
267 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
268 // FIXME disconnect those which succeeded
269 // just issue a delete on delegateTopologyHandler that gets handled on lower level
273 //combine peer futures
274 return aggregatedFuture;
277 // Trigger update on this slave
278 return delegateTopologyHandler.onNodeUpdated(nodeId, node);
281 private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
282 final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
283 return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
287 public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
288 final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
290 // Master needs to trigger delete on peers and combine results
292 futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
293 for (TopologyManager topologyManager : peers.values()) {
294 // add a future into our futures that gets its completion status from the converted scala future
295 final SettableFuture<Void> settableFuture = SettableFuture.create();
296 futures.add(settableFuture);
297 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
298 scalaFuture.onComplete(new OnComplete<Void>() {
300 public void onComplete(Throwable failure, Void success) throws Throwable {
301 if (failure != null) {
302 settableFuture.setException(failure);
306 settableFuture.set(success);
308 }, TypedActor.context().dispatcher());
311 final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
312 Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
314 public void onSuccess(final Void result) {
315 naSalNodeWriter.delete(nodeId);
319 public void onFailure(final Throwable t) {
320 // FIXME unable to disconnect all the connections, what do we do now ?
324 return aggregatedFuture;
328 return delegateTopologyHandler.onNodeDeleted(nodeId);
333 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
334 return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
338 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
339 isMaster = roleChangeDTO.isOwner();
340 delegateTopologyHandler.onRoleChanged(roleChangeDTO);
342 LOG.debug("Node {} is master now", clusterExtension.selfAddress());
343 clusterExtension.join(clusterExtension.selfAddress());
348 public Future<Boolean> isMaster() {
349 return new DefaultPromise<Boolean>().success(isMaster).future();
353 public void notifyNodeStatusChange(final NodeId nodeId) {
354 LOG.debug("Connection status has changed on node {}", nodeId.getValue());
356 // grab status from all peers and aggregate
357 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
358 futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
359 // only master should call connect on peers and aggregate futures
360 for (TopologyManager topologyManager : peers.values()) {
361 // add a future into our futures that gets its completion status from the converted scala future
362 final SettableFuture<Node> settableFuture = SettableFuture.create();
363 futures.add(settableFuture);
364 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
365 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
367 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
368 if (failure != null) {
369 settableFuture.setException(failure);
372 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
373 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
374 final Node value = (Node) fromNormalizedNode.getValue();
376 settableFuture.set(value);
378 }, TypedActor.context().dispatcher());
381 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
382 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
384 public void onSuccess(final Node result) {
385 LOG.debug("Futures aggregated succesfully");
386 naSalNodeWriter.update(nodeId, result);
390 public void onFailure(final Throwable t) {
391 // If the combined connection attempt failed, set the node to connection failed
392 LOG.debug("Futures aggregation failed");
393 naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
394 // FIXME disconnect those which succeeded
395 // just issue a delete on delegateTopologyHandler that gets handled on lower level
400 LOG.debug("Not master, forwarding..");
401 for (final TopologyManager manager : peers.values()) {
402 // asynchronously find out which peer is master
403 final Future<Boolean> future = manager.isMaster();
404 future.onComplete(new OnComplete<Boolean>() {
406 public void onComplete(Throwable failure, Boolean success) throws Throwable {
407 if (failure == null && success) {
408 LOG.debug("Found master peer");
410 manager.notifyNodeStatusChange(nodeId);
413 if (failure != null) {
414 LOG.debug("Retrieving master peer failed, {}", failure);
417 }, TypedActor.context().dispatcher());
422 public boolean hasAllPeersUp() {
423 LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
424 LOG.warn(clusterExtension.state().toString());
425 LOG.warn(peers.toString());
426 return peers.size() == 2;
430 public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
431 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
432 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
433 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
434 final Node value = (Node) fromNormalizedNode.getValue();
436 LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
437 final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
438 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
439 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
441 public void onSuccess(Node result) {
442 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
443 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
447 public void onFailure(Throwable t) {
452 return promise.future();
456 public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
457 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
458 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
459 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
460 final Node value = (Node) fromNormalizedNode.getValue();
462 LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
464 final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
465 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
466 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
468 public void onSuccess(Node result) {
469 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
470 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
474 public void onFailure(Throwable t) {
478 return promise.future();
482 public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
483 LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
485 final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
486 final DefaultPromise<Void> promise = new DefaultPromise<>();
487 Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
489 public void onSuccess(Void result) {
490 promise.success(null);
494 public void onFailure(Throwable t) {
499 return promise.future();
502 public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
503 LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
505 final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
506 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
507 Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
509 public void onSuccess(Node result) {
510 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
511 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
515 public void onFailure(Throwable t) {
519 return promise.future();
523 public void onReceive(final Object message, final ActorRef actorRef) {
524 LOG.debug("message received {}", message);
525 if (message instanceof MemberUp) {
526 final Member member = ((MemberUp) message).member();
527 LOG.info("Member is Up: {}", member);
528 if (member.address().equals(clusterExtension.selfAddress())) {
532 final String path = member.address() + PATH + topologyId;
533 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
535 // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
536 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
537 } else if (message instanceof MemberExited) {
539 final Member member = ((MemberExited) message).member();
540 LOG.info("Member exited cluster: {}", member);
541 peers.remove(member.address());
542 } else if (message instanceof MemberRemoved) {
544 final Member member = ((MemberRemoved) message).member();
545 LOG.info("Member was removed from cluster: {}", member);
546 peers.remove(member.address());
547 } else if (message instanceof UnreachableMember) {
549 final Member member = ((UnreachableMember) message).member();
550 LOG.info("Member is unreachable: {}", member);
551 peers.remove(member.address());
552 } else if (message instanceof ReachableMember) {
554 final Member member = ((ReachableMember) message).member();
555 LOG.info("Member is reachable again: {}", member);
557 if (member.address().equals(clusterExtension.selfAddress())) {
561 final String path = member.address() + PATH + topologyId;
562 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
564 clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
565 } else if (message instanceof ActorIdentity) {
566 LOG.debug("Received ActorIdentity message", message);
567 final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
568 if (((ActorIdentity) message).getRef() == null) {
569 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
570 final ActorRef self = TypedActor.context().self();
571 final ActorContext context = TypedActor.context();
572 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
575 LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
576 context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
579 }, system.dispatcher());
582 LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
584 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
585 } else if (message instanceof CustomIdentifyMessageReply) {
587 LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
588 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
589 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
590 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
595 } else if (message instanceof CustomIdentifyMessage) {
596 LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
597 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
598 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
599 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
604 actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
608 private void resyncPeer(final TopologyManager peer) {
609 final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
610 final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
612 Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
614 public void onSuccess(Optional<Topology> result) {
615 if (result.isPresent()) {
616 for (final Node node : result.get().getNode()) {
617 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
618 peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
619 // we dont care about the future from now on since we will be notified by the onConnected event
625 public void onFailure(Throwable t) {
626 LOG.error("Unable to read from datastore");