Merge "Revert "Add PasswordCredentialAuth interface""
[netconf.git] / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.util;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Random;
42 import java.util.Set;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71 import scala.concurrent.impl.Promise.DefaultPromise;
72
73 public final class BaseTopologyManager
74         implements TopologyManager {
75
76     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
77
78     private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
79
80     private final ActorSystem system;
81     private final TypedActorExtension typedExtension;
82     private final Cluster clusterExtension;
83
84     private final BindingNormalizedNodeCodecRegistry codecRegistry;
85
86     private static final String PATH = "/user/";
87
88     private final DataBroker dataBroker;
89     private final RoleChangeStrategy roleChangeStrategy;
90     private final StateAggregator aggregator;
91
92     private final NodeWriter naSalNodeWriter;
93     private final String topologyId;
94     private final TopologyManagerCallback delegateTopologyHandler;
95     private final Set<NodeId> created = new HashSet<>();
96
97     private final Map<Address, TopologyManager> peers = new HashMap<>();
98     private final int id = new Random().nextInt();
99
100     private boolean isMaster;
101
102     public BaseTopologyManager(final ActorSystem system,
103                                final BindingNormalizedNodeCodecRegistry codecRegistry,
104                                final DataBroker dataBroker,
105                                final String topologyId,
106                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
107                                final StateAggregator aggregator,
108                                final NodeWriter naSalNodeWriter,
109                                final RoleChangeStrategy roleChangeStrategy) {
110         this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
111     }
112
113     public BaseTopologyManager(final ActorSystem system,
114                                final BindingNormalizedNodeCodecRegistry codecRegistry,
115                                final DataBroker dataBroker,
116                                final String topologyId,
117                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
118                                final StateAggregator aggregator,
119                                final NodeWriter naSalNodeWriter,
120                                final RoleChangeStrategy roleChangeStrategy,
121                                final boolean isMaster) {
122
123         this.system = system;
124         this.typedExtension = TypedActor.get(system);
125         this.clusterExtension = Cluster.get(system);
126         this.dataBroker = dataBroker;
127         this.topologyId = topologyId;
128         this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
129         this.aggregator = aggregator;
130         this.naSalNodeWriter = naSalNodeWriter;
131         this.roleChangeStrategy = roleChangeStrategy;
132         this.codecRegistry = codecRegistry;
133
134         // election has not yet happened
135         this.isMaster = isMaster;
136
137         this.topologyListPath = TopologyUtil.createTopologyListPath(topologyId);
138
139         LOG.debug("Base manager started ", +id);
140     }
141
142     @Override
143     public void preStart() {
144         LOG.debug("preStart called");
145         // TODO change to enum, master/slave active/standby
146         roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
147         LOG.debug("candidate registered");
148         clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
149     }
150
151     @Override
152     public void postStop() {
153         LOG.debug("postStop called");
154         clusterExtension.leave(clusterExtension.selfAddress());
155         clusterExtension.unsubscribe(TypedActor.context().self());
156     }
157
158     @Override
159     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
160         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
161
162         if (created.contains(nodeId)) {
163             LOG.warn("Node{} already exists, triggering update..", nodeId);
164             return onNodeUpdated(nodeId, node);
165         }
166         created.add(nodeId);
167         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
168
169         if (isMaster) {
170
171             futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
172             // only master should call connect on peers and aggregate futures
173             for (TopologyManager topologyManager : peers.values()) {
174                 // convert binding into NormalizedNode for transfer
175                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
176
177                 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
178                 LOG.debug("Value {}", normalizedNodeEntry.getValue());
179
180                 // add a future into our futures that gets its completion status from the converted scala future
181                 final SettableFuture<Node> settableFuture = SettableFuture.create();
182                 futures.add(settableFuture);
183                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
184                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
185                     @Override
186                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
187                         if (failure != null) {
188                             settableFuture.setException(failure);
189                             return;
190                         }
191                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
192                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
193                         final Node value = (Node) fromNormalizedNode.getValue();
194
195                         settableFuture.set(value);
196                     }
197                 }, TypedActor.context().dispatcher());
198             }
199
200             final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
201             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
202                 @Override
203                 public void onSuccess(final Node result) {
204                     LOG.debug("Futures aggregated succesfully");
205                     naSalNodeWriter.init(nodeId, result);
206                 }
207
208                 @Override
209                 public void onFailure(final Throwable t) {
210                     // If the combined connection attempt failed, set the node to connection failed
211                     LOG.debug("Futures aggregation failed");
212                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
213                 }
214             }, TypedActor.context().dispatcher());
215
216             //combine peer futures
217             return aggregatedFuture;
218         }
219
220         // trigger create on this slave
221         return delegateTopologyHandler.onNodeCreated(nodeId, node);
222     }
223
224     @Override
225     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
226         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
227
228         // Master needs to trigger onNodeUpdated on peers and combine results
229         if (isMaster) {
230             // first cleanup old node
231             final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
232             final SettableFuture<Node> createFuture = SettableFuture.create();
233             final TopologyManager selfProxy = TypedActor.self();
234             final ActorContext context = TypedActor.context();
235             Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
236                 @Override
237                 public void onSuccess(Void result) {
238                     LOG.warn("Delete part of update succesfull, triggering create");
239                     // trigger create on all nodes
240                     Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
241                         @Override
242                         public void onSuccess(Node result) {
243                             createFuture.set(result);
244                         }
245
246                         @Override
247                         public void onFailure(Throwable t) {
248                             createFuture.setException(t);
249                         }
250                     }, context.dispatcher());
251                 }
252
253                 @Override
254                 public void onFailure(Throwable t) {
255                     LOG.warn("Delete part of update failed, {}", t);
256                 }
257             }, context.dispatcher());
258             return createFuture;
259         }
260
261         // Trigger update on this slave
262         return delegateTopologyHandler.onNodeUpdated(nodeId, node);
263     }
264
265     @Override
266     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
267         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
268         created.remove(nodeId);
269
270         // Master needs to trigger delete on peers and combine results
271         if (isMaster) {
272             futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
273             for (TopologyManager topologyManager : peers.values()) {
274                 // add a future into our futures that gets its completion status from the converted scala future
275                 final SettableFuture<Void> settableFuture = SettableFuture.create();
276                 futures.add(settableFuture);
277                 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
278                 scalaFuture.onComplete(new OnComplete<Void>() {
279                     @Override
280                     public void onComplete(Throwable failure, Void success) throws Throwable {
281                         if (failure != null) {
282                             settableFuture.setException(failure);
283                             return;
284                         }
285
286                         settableFuture.set(success);
287                     }
288                 }, TypedActor.context().dispatcher());
289             }
290
291             final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
292             Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
293                 @Override
294                 public void onSuccess(final Void result) {
295                     naSalNodeWriter.delete(nodeId);
296                 }
297
298                 @Override
299                 public void onFailure(final Throwable t) {
300
301                 }
302             });
303
304             return aggregatedFuture;
305         }
306
307         // Trigger delete
308         return delegateTopologyHandler.onNodeDeleted(nodeId);
309     }
310
311     @Nonnull
312     @Override
313     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
314         return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
315     }
316
317     @Override
318     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
319         isMaster = roleChangeDTO.isOwner();
320         delegateTopologyHandler.onRoleChanged(roleChangeDTO);
321         if (isMaster) {
322             LOG.debug("Node {} is master now", clusterExtension.selfAddress());
323             clusterExtension.join(clusterExtension.selfAddress());
324         }
325     }
326
327     @Override
328     public Future<Boolean> isMaster() {
329         return new DefaultPromise<Boolean>().success(isMaster).future();
330     }
331
332     @Override
333     public void notifyNodeStatusChange(final NodeId nodeId) {
334         LOG.debug("Connection status has changed on node {}", nodeId.getValue());
335         if (isMaster) {
336             // grab status from all peers and aggregate
337             final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
338             futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
339             // only master should call connect on peers and aggregate futures
340             for (TopologyManager topologyManager : peers.values()) {
341                 // add a future into our futures that gets its completion status from the converted scala future
342                 final SettableFuture<Node> settableFuture = SettableFuture.create();
343                 futures.add(settableFuture);
344                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
345                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
346                     @Override
347                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
348                         if (failure != null) {
349                             settableFuture.setException(failure);
350                             return;
351                         }
352                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
353                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
354                         final Node value = (Node) fromNormalizedNode.getValue();
355
356                         settableFuture.set(value);
357                     }
358                 }, TypedActor.context().dispatcher());
359             }
360
361             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
362             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
363                 @Override
364                 public void onSuccess(final Node result) {
365                     LOG.debug("Futures aggregated succesfully");
366                     naSalNodeWriter.update(nodeId, result);
367                 }
368
369                 @Override
370                 public void onFailure(final Throwable t) {
371                     // If the combined connection attempt failed, set the node to connection failed
372                     LOG.debug("Futures aggregation failed");
373                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
374                 }
375             });
376             return;
377         }
378         LOG.debug("Not master, forwarding..");
379         for (final TopologyManager manager : peers.values()) {
380             // asynchronously find out which peer is master
381             final Future<Boolean> future = manager.isMaster();
382             future.onComplete(new OnComplete<Boolean>() {
383                 @Override
384                 public void onComplete(Throwable failure, Boolean success) throws Throwable {
385                     if (failure == null && success) {
386                         LOG.debug("Found master peer");
387                         // forward to master
388                         manager.notifyNodeStatusChange(nodeId);
389                         return;
390                     }
391                     if (failure != null) {
392                         LOG.debug("Retrieving master peer failed, {}", failure);
393                     }
394                 }
395             }, TypedActor.context().dispatcher());
396         }
397     }
398
399     @Override
400     public boolean hasAllPeersUp() {
401         LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
402         LOG.warn(clusterExtension.state().toString());
403         LOG.warn(peers.toString());
404         return peers.size() == 2;
405     }
406
407     @Override
408     public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
409         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
410                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
411         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
412         final Node value = (Node) fromNormalizedNode.getValue();
413
414         LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
415         final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
416         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
417         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
418             @Override
419             public void onSuccess(Node result) {
420                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
421                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
422             }
423
424             @Override
425             public void onFailure(Throwable t) {
426                 promise.failure(t);
427             }
428         });
429
430         return promise.future();
431     }
432
433     @Override
434     public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
435         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
436                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
437         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
438         final Node value = (Node) fromNormalizedNode.getValue();
439
440         LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
441
442         final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
443         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
444         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
445             @Override
446             public void onSuccess(Node result) {
447                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
448                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
449             }
450
451             @Override
452             public void onFailure(Throwable t) {
453                 promise.failure(t);
454             }
455         });
456         return promise.future();
457     }
458
459     @Override
460     public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
461         LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
462
463         final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
464         final DefaultPromise<Void> promise = new DefaultPromise<>();
465         Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
466             @Override
467             public void onSuccess(Void result) {
468                 promise.success(null);
469             }
470
471             @Override
472             public void onFailure(Throwable t) {
473                 promise.failure(t);
474             }
475         });
476
477         return promise.future();
478     }
479
480     public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
481         LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
482
483         final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
484         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
485         Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
486             @Override
487             public void onSuccess(Node result) {
488                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), result);
489                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
490             }
491
492             @Override
493             public void onFailure(Throwable t) {
494                 promise.failure(t);
495             }
496         });
497         return promise.future();
498     }
499
500     @Override
501     public void onReceive(final Object message, final ActorRef actorRef) {
502         LOG.debug("message received {}", message);
503         if (message instanceof MemberUp) {
504             final Member member = ((MemberUp) message).member();
505             LOG.info("Member is Up: {}", member);
506             if (member.address().equals(clusterExtension.selfAddress())) {
507                 return;
508             }
509
510             final String path = member.address() + PATH + topologyId;
511             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
512
513             // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
514             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
515         } else if (message instanceof MemberExited) {
516             // remove peer
517             final Member member = ((MemberExited) message).member();
518             LOG.info("Member exited cluster: {}", member);
519             peers.remove(member.address());
520         } else if (message instanceof MemberRemoved) {
521             // remove peer
522             final Member member = ((MemberRemoved) message).member();
523             LOG.info("Member was removed from cluster: {}", member);
524             peers.remove(member.address());
525         } else if (message instanceof UnreachableMember) {
526             // remove peer
527             final Member member = ((UnreachableMember) message).member();
528             LOG.info("Member is unreachable: {}", member);
529             peers.remove(member.address());
530         } else if (message instanceof ReachableMember) {
531             // resync peer
532             final Member member = ((ReachableMember) message).member();
533             LOG.info("Member is reachable again: {}", member);
534
535             if (member.address().equals(clusterExtension.selfAddress())) {
536                 return;
537             }
538
539             final String path = member.address() + PATH + topologyId;
540             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
541
542             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
543         } else if (message instanceof ActorIdentity) {
544             LOG.debug("Received ActorIdentity message", message);
545             final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
546             if (((ActorIdentity) message).getRef() == null) {
547                 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
548                 final ActorRef self = TypedActor.context().self();
549                 final ActorContext context = TypedActor.context();
550                 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
551                     @Override
552                     public void run() {
553                         LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
554                         context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
555
556                     }
557                 }, system.dispatcher());
558                 return;
559             }
560             LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
561
562             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
563         } else if (message instanceof CustomIdentifyMessageReply) {
564
565             LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
566             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
567                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
568                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
569                 if (isMaster) {
570                     resyncPeer(peer);
571                 }
572             }
573         } else if (message instanceof CustomIdentifyMessage) {
574             LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
575             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
576                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
577                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
578                 if (isMaster) {
579                     resyncPeer(peer);
580                 }
581             }
582             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
583         }
584     }
585
586     private void resyncPeer(final TopologyManager peer) {
587         final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
588         final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
589
590         Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
591             @Override
592             public void onSuccess(Optional<Topology> result) {
593                 if (result.isPresent() && result.get().getNode() != null) {
594                     for (final Node node : result.get().getNode()) {
595                         final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
596                         peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
597                         // we dont care about the future from now on since we will be notified by the onConnected event
598                     }
599                 }
600             }
601
602             @Override
603             public void onFailure(Throwable t) {
604                 LOG.error("Unable to read from datastore");
605             }
606         });
607
608     }
609 }