More netconf-topology module unit tests
[netconf.git] / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.util;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Random;
42 import java.util.Set;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71 import scala.concurrent.impl.Promise.DefaultPromise;
72
73 public final class BaseTopologyManager
74         implements TopologyManager {
75
76     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
77
78     private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
79
80     private final ActorSystem system;
81     private final TypedActorExtension typedExtension;
82     private final Cluster clusterExtension;
83
84     private final BindingNormalizedNodeCodecRegistry codecRegistry;
85
86     private static final String PATH = "/user/";
87
88     private final DataBroker dataBroker;
89     private final RoleChangeStrategy roleChangeStrategy;
90     private final StateAggregator aggregator;
91
92     private final NodeWriter naSalNodeWriter;
93     private final String topologyId;
94     private final TopologyManagerCallback delegateTopologyHandler;
95     private final Set<NodeId> created = new HashSet<>();
96
97     private final Map<Address, TopologyManager> peers = new HashMap<>();
98     private final int id = new Random().nextInt();
99
100     private boolean isMaster;
101
102     public BaseTopologyManager(final ActorSystem system,
103                                final BindingNormalizedNodeCodecRegistry codecRegistry,
104                                final DataBroker dataBroker,
105                                final String topologyId,
106                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
107                                final StateAggregator aggregator,
108                                final NodeWriter naSalNodeWriter,
109                                final RoleChangeStrategy roleChangeStrategy) {
110         this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
111     }
112
113     public BaseTopologyManager(final ActorSystem system,
114                                final BindingNormalizedNodeCodecRegistry codecRegistry,
115                                final DataBroker dataBroker,
116                                final String topologyId,
117                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
118                                final StateAggregator aggregator,
119                                final NodeWriter naSalNodeWriter,
120                                final RoleChangeStrategy roleChangeStrategy,
121                                final boolean isMaster) {
122
123         this.system = system;
124         this.typedExtension = TypedActor.get(system);
125         this.clusterExtension = Cluster.get(system);
126         this.dataBroker = dataBroker;
127         this.topologyId = topologyId;
128         this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
129         this.aggregator = aggregator;
130         this.naSalNodeWriter = naSalNodeWriter;
131         this.roleChangeStrategy = roleChangeStrategy;
132         this.codecRegistry = codecRegistry;
133
134         // election has not yet happened
135         this.isMaster = isMaster;
136
137         this.topologyListPath = TopologyUtil.createTopologyListPath(topologyId);
138
139         LOG.debug("Base manager started ", +id);
140     }
141
142     @Override
143     public void preStart() {
144         LOG.debug("preStart called");
145         // TODO change to enum, master/slave active/standby
146         roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
147         LOG.debug("candidate registered");
148         clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
149     }
150
151     @Override
152     public void postStop() {
153         LOG.debug("postStop called");
154         clusterExtension.leave(clusterExtension.selfAddress());
155         clusterExtension.unsubscribe(TypedActor.context().self());
156     }
157
158     @Override
159     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
160         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
161
162         if (created.contains(nodeId)) {
163             LOG.warn("Node{} already exists, triggering update..", nodeId);
164             return onNodeUpdated(nodeId, node);
165         }
166         created.add(nodeId);
167         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
168
169         if (isMaster) {
170
171             futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
172             // only master should call connect on peers and aggregate futures
173             for (TopologyManager topologyManager : peers.values()) {
174                 // convert binding into NormalizedNode for transfer
175                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
176
177                 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
178                 LOG.debug("Value {}", normalizedNodeEntry.getValue());
179
180                 // add a future into our futures that gets its completion status from the converted scala future
181                 final SettableFuture<Node> settableFuture = SettableFuture.create();
182                 futures.add(settableFuture);
183                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
184                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
185                     @Override
186                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
187                         if (failure != null) {
188                             settableFuture.setException(failure);
189                             return;
190                         }
191                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
192                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
193                         final Node value = (Node) fromNormalizedNode.getValue();
194
195                         settableFuture.set(value);
196                     }
197                 }, TypedActor.context().dispatcher());
198             }
199
200             final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
201             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
202                 @Override
203                 public void onSuccess(final Node result) {
204                     LOG.debug("Futures aggregated succesfully");
205                     naSalNodeWriter.init(nodeId, result);
206                 }
207
208                 @Override
209                 public void onFailure(final Throwable t) {
210                     // If the combined connection attempt failed, set the node to connection failed
211                     LOG.debug("Futures aggregation failed");
212                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
213                     // FIXME disconnect those which succeeded
214                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
215                 }
216             }, TypedActor.context().dispatcher());
217
218             //combine peer futures
219             return aggregatedFuture;
220         }
221
222         // trigger create on this slave
223         return delegateTopologyHandler.onNodeCreated(nodeId, node);
224     }
225
226     @Override
227     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
228         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
229
230         // Master needs to trigger onNodeUpdated on peers and combine results
231         if (isMaster) {
232             // first cleanup old node
233             final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
234             final SettableFuture<Node> createFuture = SettableFuture.create();
235             final TopologyManager selfProxy = TypedActor.self();
236             final ActorContext context = TypedActor.context();
237             Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
238                 @Override
239                 public void onSuccess(Void result) {
240                     LOG.warn("Delete part of update succesfull, triggering create");
241                     // trigger create on all nodes
242                     Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
243                         @Override
244                         public void onSuccess(Node result) {
245                             createFuture.set(result);
246                         }
247
248                         @Override
249                         public void onFailure(Throwable t) {
250                             createFuture.setException(t);
251                         }
252                     }, context.dispatcher());
253                 }
254
255                 @Override
256                 public void onFailure(Throwable t) {
257                     LOG.warn("Delete part of update failed, {}", t);
258                 }
259             }, context.dispatcher());
260             return createFuture;
261         }
262
263         // Trigger update on this slave
264         return delegateTopologyHandler.onNodeUpdated(nodeId, node);
265     }
266
267     @Override
268     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
269         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
270         created.remove(nodeId);
271
272         // Master needs to trigger delete on peers and combine results
273         if (isMaster) {
274             futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
275             for (TopologyManager topologyManager : peers.values()) {
276                 // add a future into our futures that gets its completion status from the converted scala future
277                 final SettableFuture<Void> settableFuture = SettableFuture.create();
278                 futures.add(settableFuture);
279                 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
280                 scalaFuture.onComplete(new OnComplete<Void>() {
281                     @Override
282                     public void onComplete(Throwable failure, Void success) throws Throwable {
283                         if (failure != null) {
284                             settableFuture.setException(failure);
285                             return;
286                         }
287
288                         settableFuture.set(success);
289                     }
290                 }, TypedActor.context().dispatcher());
291             }
292
293             final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
294             Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
295                 @Override
296                 public void onSuccess(final Void result) {
297                     naSalNodeWriter.delete(nodeId);
298                 }
299
300                 @Override
301                 public void onFailure(final Throwable t) {
302                     // FIXME unable to disconnect all the connections, what do we do now ?
303                 }
304             });
305
306             return aggregatedFuture;
307         }
308
309         // Trigger delete
310         return delegateTopologyHandler.onNodeDeleted(nodeId);
311     }
312
313     @Nonnull
314     @Override
315     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
316         return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
317     }
318
319     @Override
320     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
321         isMaster = roleChangeDTO.isOwner();
322         delegateTopologyHandler.onRoleChanged(roleChangeDTO);
323         if (isMaster) {
324             LOG.debug("Node {} is master now", clusterExtension.selfAddress());
325             clusterExtension.join(clusterExtension.selfAddress());
326         }
327     }
328
329     @Override
330     public Future<Boolean> isMaster() {
331         return new DefaultPromise<Boolean>().success(isMaster).future();
332     }
333
334     @Override
335     public void notifyNodeStatusChange(final NodeId nodeId) {
336         LOG.debug("Connection status has changed on node {}", nodeId.getValue());
337         if (isMaster) {
338             // grab status from all peers and aggregate
339             final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
340             futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
341             // only master should call connect on peers and aggregate futures
342             for (TopologyManager topologyManager : peers.values()) {
343                 // add a future into our futures that gets its completion status from the converted scala future
344                 final SettableFuture<Node> settableFuture = SettableFuture.create();
345                 futures.add(settableFuture);
346                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
347                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
348                     @Override
349                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
350                         if (failure != null) {
351                             settableFuture.setException(failure);
352                             return;
353                         }
354                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
355                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
356                         final Node value = (Node) fromNormalizedNode.getValue();
357
358                         settableFuture.set(value);
359                     }
360                 }, TypedActor.context().dispatcher());
361             }
362
363             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
364             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
365                 @Override
366                 public void onSuccess(final Node result) {
367                     LOG.debug("Futures aggregated succesfully");
368                     naSalNodeWriter.update(nodeId, result);
369                 }
370
371                 @Override
372                 public void onFailure(final Throwable t) {
373                     // If the combined connection attempt failed, set the node to connection failed
374                     LOG.debug("Futures aggregation failed");
375                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
376                     // FIXME disconnect those which succeeded
377                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
378                 }
379             });
380             return;
381         }
382         LOG.debug("Not master, forwarding..");
383         for (final TopologyManager manager : peers.values()) {
384             // asynchronously find out which peer is master
385             final Future<Boolean> future = manager.isMaster();
386             future.onComplete(new OnComplete<Boolean>() {
387                 @Override
388                 public void onComplete(Throwable failure, Boolean success) throws Throwable {
389                     if (failure == null && success) {
390                         LOG.debug("Found master peer");
391                         // forward to master
392                         manager.notifyNodeStatusChange(nodeId);
393                         return;
394                     }
395                     if (failure != null) {
396                         LOG.debug("Retrieving master peer failed, {}", failure);
397                     }
398                 }
399             }, TypedActor.context().dispatcher());
400         }
401     }
402
403     @Override
404     public boolean hasAllPeersUp() {
405         LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
406         LOG.warn(clusterExtension.state().toString());
407         LOG.warn(peers.toString());
408         return peers.size() == 2;
409     }
410
411     @Override
412     public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
413         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
414                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
415         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
416         final Node value = (Node) fromNormalizedNode.getValue();
417
418         LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
419         final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
420         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
421         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
422             @Override
423             public void onSuccess(Node result) {
424                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
425                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
426             }
427
428             @Override
429             public void onFailure(Throwable t) {
430                 promise.failure(t);
431             }
432         });
433
434         return promise.future();
435     }
436
437     @Override
438     public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
439         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
440                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
441         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
442         final Node value = (Node) fromNormalizedNode.getValue();
443
444         LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
445
446         final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
447         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
448         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
449             @Override
450             public void onSuccess(Node result) {
451                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
452                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
453             }
454
455             @Override
456             public void onFailure(Throwable t) {
457                 promise.failure(t);
458             }
459         });
460         return promise.future();
461     }
462
463     @Override
464     public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
465         LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
466
467         final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
468         final DefaultPromise<Void> promise = new DefaultPromise<>();
469         Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
470             @Override
471             public void onSuccess(Void result) {
472                 promise.success(null);
473             }
474
475             @Override
476             public void onFailure(Throwable t) {
477                 promise.failure(t);
478             }
479         });
480
481         return promise.future();
482     }
483
484     public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
485         LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
486
487         final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
488         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
489         Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
490             @Override
491             public void onSuccess(Node result) {
492                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), result);
493                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
494             }
495
496             @Override
497             public void onFailure(Throwable t) {
498                 promise.failure(t);
499             }
500         });
501         return promise.future();
502     }
503
504     @Override
505     public void onReceive(final Object message, final ActorRef actorRef) {
506         LOG.debug("message received {}", message);
507         if (message instanceof MemberUp) {
508             final Member member = ((MemberUp) message).member();
509             LOG.info("Member is Up: {}", member);
510             if (member.address().equals(clusterExtension.selfAddress())) {
511                 return;
512             }
513
514             final String path = member.address() + PATH + topologyId;
515             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
516
517             // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
518             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
519         } else if (message instanceof MemberExited) {
520             // remove peer
521             final Member member = ((MemberExited) message).member();
522             LOG.info("Member exited cluster: {}", member);
523             peers.remove(member.address());
524         } else if (message instanceof MemberRemoved) {
525             // remove peer
526             final Member member = ((MemberRemoved) message).member();
527             LOG.info("Member was removed from cluster: {}", member);
528             peers.remove(member.address());
529         } else if (message instanceof UnreachableMember) {
530             // remove peer
531             final Member member = ((UnreachableMember) message).member();
532             LOG.info("Member is unreachable: {}", member);
533             peers.remove(member.address());
534         } else if (message instanceof ReachableMember) {
535             // resync peer
536             final Member member = ((ReachableMember) message).member();
537             LOG.info("Member is reachable again: {}", member);
538
539             if (member.address().equals(clusterExtension.selfAddress())) {
540                 return;
541             }
542
543             final String path = member.address() + PATH + topologyId;
544             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
545
546             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
547         } else if (message instanceof ActorIdentity) {
548             LOG.debug("Received ActorIdentity message", message);
549             final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
550             if (((ActorIdentity) message).getRef() == null) {
551                 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
552                 final ActorRef self = TypedActor.context().self();
553                 final ActorContext context = TypedActor.context();
554                 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
555                     @Override
556                     public void run() {
557                         LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
558                         context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
559
560                     }
561                 }, system.dispatcher());
562                 return;
563             }
564             LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
565
566             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
567         } else if (message instanceof CustomIdentifyMessageReply) {
568
569             LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
570             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
571                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
572                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
573                 if (isMaster) {
574                     resyncPeer(peer);
575                 }
576             }
577         } else if (message instanceof CustomIdentifyMessage) {
578             LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
579             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
580                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
581                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
582                 if (isMaster) {
583                     resyncPeer(peer);
584                 }
585             }
586             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
587         }
588     }
589
590     private void resyncPeer(final TopologyManager peer) {
591         final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
592         final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
593
594         Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
595             @Override
596             public void onSuccess(Optional<Topology> result) {
597                 if (result.isPresent() && result.get().getNode() != null) {
598                     for (final Node node : result.get().getNode()) {
599                         final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
600                         peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
601                         // we dont care about the future from now on since we will be notified by the onConnected event
602                     }
603                 }
604             }
605
606             @Override
607             public void onFailure(Throwable t) {
608                 LOG.error("Unable to read from datastore");
609             }
610         });
611
612     }
613 }