builder class for path creating
[netconf.git] / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.util;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Random;
42 import java.util.Set;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
62 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
63 import org.opendaylight.yangtools.yang.binding.DataObject;
64 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
65 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.concurrent.Future;
71 import scala.concurrent.duration.FiniteDuration;
72 import scala.concurrent.impl.Promise.DefaultPromise;
73
74 public final class BaseTopologyManager
75         implements TopologyManager {
76
77     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
78
79     private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
80
81     private final ActorSystem system;
82     private final TypedActorExtension typedExtension;
83     private final Cluster clusterExtension;
84
85     private final BindingNormalizedNodeCodecRegistry codecRegistry;
86
87     private static final String PATH = "/user/";
88
89     private final DataBroker dataBroker;
90     private final RoleChangeStrategy roleChangeStrategy;
91     private final StateAggregator aggregator;
92
93     private final NodeWriter naSalNodeWriter;
94     private final String topologyId;
95     private final TopologyManagerCallback delegateTopologyHandler;
96     private final Set<NodeId> created = new HashSet<>();
97
98     private final Map<Address, TopologyManager> peers = new HashMap<>();
99     private final int id = new Random().nextInt();
100
101     private boolean isMaster;
102
103     public BaseTopologyManager(final ActorSystem system,
104                                final BindingNormalizedNodeCodecRegistry codecRegistry,
105                                final DataBroker dataBroker,
106                                final String topologyId,
107                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
108                                final StateAggregator aggregator,
109                                final NodeWriter naSalNodeWriter,
110                                final RoleChangeStrategy roleChangeStrategy) {
111         this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
112     }
113
114     public BaseTopologyManager(final ActorSystem system,
115                                final BindingNormalizedNodeCodecRegistry codecRegistry,
116                                final DataBroker dataBroker,
117                                final String topologyId,
118                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
119                                final StateAggregator aggregator,
120                                final NodeWriter naSalNodeWriter,
121                                final RoleChangeStrategy roleChangeStrategy,
122                                final boolean isMaster) {
123
124         this.system = system;
125         this.typedExtension = TypedActor.get(system);
126         this.clusterExtension = Cluster.get(system);
127         this.dataBroker = dataBroker;
128         this.topologyId = topologyId;
129         this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
130         this.aggregator = aggregator;
131         this.naSalNodeWriter = naSalNodeWriter;
132         this.roleChangeStrategy = roleChangeStrategy;
133         this.codecRegistry = codecRegistry;
134
135         // election has not yet happened
136         this.isMaster = isMaster;
137
138         this.topologyListPath = TopologyUtil.createTopologyListPath(topologyId);
139
140         LOG.debug("Base manager started ", +id);
141     }
142
143     @Override
144     public void preStart() {
145         LOG.debug("preStart called");
146         // TODO change to enum, master/slave active/standby
147         roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
148         LOG.debug("candidate registered");
149         clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
150     }
151
152     @Override
153     public void postStop() {
154         LOG.debug("postStop called");
155         clusterExtension.leave(clusterExtension.selfAddress());
156         clusterExtension.unsubscribe(TypedActor.context().self());
157     }
158
159     @Override
160     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
161         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
162
163         if (created.contains(nodeId)) {
164             LOG.warn("Node{} already exists, triggering update..", nodeId);
165             return onNodeUpdated(nodeId, node);
166         }
167         created.add(nodeId);
168         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
169
170         if (isMaster) {
171
172             futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
173             // only master should call connect on peers and aggregate futures
174             for (TopologyManager topologyManager : peers.values()) {
175                 // convert binding into NormalizedNode for transfer
176                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
177
178                 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
179                 LOG.debug("Value {}", normalizedNodeEntry.getValue());
180
181                 // add a future into our futures that gets its completion status from the converted scala future
182                 final SettableFuture<Node> settableFuture = SettableFuture.create();
183                 futures.add(settableFuture);
184                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
185                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
186                     @Override
187                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
188                         if (failure != null) {
189                             settableFuture.setException(failure);
190                             return;
191                         }
192                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
193                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
194                         final Node value = (Node) fromNormalizedNode.getValue();
195
196                         settableFuture.set(value);
197                     }
198                 }, TypedActor.context().dispatcher());
199             }
200
201             final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
202             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
203                 @Override
204                 public void onSuccess(final Node result) {
205                     LOG.debug("Futures aggregated succesfully");
206                     naSalNodeWriter.init(nodeId, result);
207                 }
208
209                 @Override
210                 public void onFailure(final Throwable t) {
211                     // If the combined connection attempt failed, set the node to connection failed
212                     LOG.debug("Futures aggregation failed");
213                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
214                 }
215             }, TypedActor.context().dispatcher());
216
217             //combine peer futures
218             return aggregatedFuture;
219         }
220
221         // trigger create on this slave
222         return delegateTopologyHandler.onNodeCreated(nodeId, node);
223     }
224
225     @Override
226     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
227         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
228
229         // Master needs to trigger onNodeUpdated on peers and combine results
230         if (isMaster) {
231             // first cleanup old node
232             final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
233             final SettableFuture<Node> createFuture = SettableFuture.create();
234             final TopologyManager selfProxy = TypedActor.self();
235             final ActorContext context = TypedActor.context();
236             Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
237                 @Override
238                 public void onSuccess(Void result) {
239                     LOG.warn("Delete part of update succesfull, triggering create");
240                     // trigger create on all nodes
241                     Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
242                         @Override
243                         public void onSuccess(Node result) {
244                             createFuture.set(result);
245                         }
246
247                         @Override
248                         public void onFailure(Throwable t) {
249                             createFuture.setException(t);
250                         }
251                     }, context.dispatcher());
252                 }
253
254                 @Override
255                 public void onFailure(Throwable t) {
256                     LOG.warn("Delete part of update failed, {}", t);
257                 }
258             }, context.dispatcher());
259             return createFuture;
260         }
261
262         // Trigger update on this slave
263         return delegateTopologyHandler.onNodeUpdated(nodeId, node);
264     }
265
266     @Override
267     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
268         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
269         created.remove(nodeId);
270
271         // Master needs to trigger delete on peers and combine results
272         if (isMaster) {
273             futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
274             for (TopologyManager topologyManager : peers.values()) {
275                 // add a future into our futures that gets its completion status from the converted scala future
276                 final SettableFuture<Void> settableFuture = SettableFuture.create();
277                 futures.add(settableFuture);
278                 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
279                 scalaFuture.onComplete(new OnComplete<Void>() {
280                     @Override
281                     public void onComplete(Throwable failure, Void success) throws Throwable {
282                         if (failure != null) {
283                             settableFuture.setException(failure);
284                             return;
285                         }
286
287                         settableFuture.set(success);
288                     }
289                 }, TypedActor.context().dispatcher());
290             }
291
292             final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
293             Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
294                 @Override
295                 public void onSuccess(final Void result) {
296                     naSalNodeWriter.delete(nodeId);
297                 }
298
299                 @Override
300                 public void onFailure(final Throwable t) {
301
302                 }
303             });
304
305             return aggregatedFuture;
306         }
307
308         // Trigger delete
309         return delegateTopologyHandler.onNodeDeleted(nodeId);
310     }
311
312     @Nonnull
313     @Override
314     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
315         return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
316     }
317
318     @Override
319     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
320         isMaster = roleChangeDTO.isOwner();
321         delegateTopologyHandler.onRoleChanged(roleChangeDTO);
322         if (isMaster) {
323             LOG.debug("Node {} is master now", clusterExtension.selfAddress());
324             clusterExtension.join(clusterExtension.selfAddress());
325         }
326     }
327
328     @Override
329     public Future<Boolean> isMaster() {
330         return new DefaultPromise<Boolean>().success(isMaster).future();
331     }
332
333     @Override
334     public void notifyNodeStatusChange(final NodeId nodeId) {
335         LOG.debug("Connection status has changed on node {}", nodeId.getValue());
336         if (isMaster) {
337             // grab status from all peers and aggregate
338             final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
339             futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
340             // only master should call connect on peers and aggregate futures
341             for (TopologyManager topologyManager : peers.values()) {
342                 // add a future into our futures that gets its completion status from the converted scala future
343                 final SettableFuture<Node> settableFuture = SettableFuture.create();
344                 futures.add(settableFuture);
345                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
346                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
347                     @Override
348                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
349                         if (failure != null) {
350                             settableFuture.setException(failure);
351                             return;
352                         }
353                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
354                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
355                         final Node value = (Node) fromNormalizedNode.getValue();
356
357                         settableFuture.set(value);
358                     }
359                 }, TypedActor.context().dispatcher());
360             }
361
362             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
363             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
364                 @Override
365                 public void onSuccess(final Node result) {
366                     LOG.debug("Futures aggregated succesfully");
367                     naSalNodeWriter.update(nodeId, result);
368                 }
369
370                 @Override
371                 public void onFailure(final Throwable t) {
372                     // If the combined connection attempt failed, set the node to connection failed
373                     LOG.debug("Futures aggregation failed");
374                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
375                 }
376             });
377             return;
378         }
379         LOG.debug("Not master, forwarding..");
380         for (final TopologyManager manager : peers.values()) {
381             // asynchronously find out which peer is master
382             final Future<Boolean> future = manager.isMaster();
383             future.onComplete(new OnComplete<Boolean>() {
384                 @Override
385                 public void onComplete(Throwable failure, Boolean success) throws Throwable {
386                     if (failure == null && success) {
387                         LOG.debug("Found master peer");
388                         // forward to master
389                         manager.notifyNodeStatusChange(nodeId);
390                         return;
391                     }
392                     if (failure != null) {
393                         LOG.debug("Retrieving master peer failed, {}", failure);
394                     }
395                 }
396             }, TypedActor.context().dispatcher());
397         }
398     }
399
400     @Override
401     public boolean hasAllPeersUp() {
402         LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
403         LOG.warn(clusterExtension.state().toString());
404         LOG.warn(peers.toString());
405         return peers.size() == 2;
406     }
407
408     @Override
409     public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
410         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
411                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
412         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
413         final Node value = (Node) fromNormalizedNode.getValue();
414
415         LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
416         final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
417         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
418         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
419             @Override
420             public void onSuccess(Node result) {
421                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
422                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
423             }
424
425             @Override
426             public void onFailure(Throwable t) {
427                 promise.failure(t);
428             }
429         });
430
431         return promise.future();
432     }
433
434     @Override
435     public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
436         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
437                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
438         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
439         final Node value = (Node) fromNormalizedNode.getValue();
440
441         LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
442
443         final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
444         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
445         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
446             @Override
447             public void onSuccess(Node result) {
448                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
449                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
450             }
451
452             @Override
453             public void onFailure(Throwable t) {
454                 promise.failure(t);
455             }
456         });
457         return promise.future();
458     }
459
460     @Override
461     public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
462         LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
463
464         final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
465         final DefaultPromise<Void> promise = new DefaultPromise<>();
466         Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
467             @Override
468             public void onSuccess(Void result) {
469                 promise.success(null);
470             }
471
472             @Override
473             public void onFailure(Throwable t) {
474                 promise.failure(t);
475             }
476         });
477
478         return promise.future();
479     }
480
481     public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
482         LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
483
484         final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
485         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
486         Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
487             @Override
488             public void onSuccess(Node result) {
489                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), result);
490                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
491             }
492
493             @Override
494             public void onFailure(Throwable t) {
495                 promise.failure(t);
496             }
497         });
498         return promise.future();
499     }
500
501     @Override
502     public void onReceive(final Object message, final ActorRef actorRef) {
503         LOG.debug("message received {}", message);
504         if (message instanceof MemberUp) {
505             final Member member = ((MemberUp) message).member();
506             LOG.info("Member is Up: {}", member);
507             if (member.address().equals(clusterExtension.selfAddress())) {
508                 return;
509             }
510
511             final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(member.address().toString(), topologyId);
512             final String path = pathCreator.build();
513             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
514
515             // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
516             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
517         } else if (message instanceof MemberExited) {
518             // remove peer
519             final Member member = ((MemberExited) message).member();
520             LOG.info("Member exited cluster: {}", member);
521             peers.remove(member.address());
522         } else if (message instanceof MemberRemoved) {
523             // remove peer
524             final Member member = ((MemberRemoved) message).member();
525             LOG.info("Member was removed from cluster: {}", member);
526             peers.remove(member.address());
527         } else if (message instanceof UnreachableMember) {
528             // remove peer
529             final Member member = ((UnreachableMember) message).member();
530             LOG.info("Member is unreachable: {}", member);
531             peers.remove(member.address());
532         } else if (message instanceof ReachableMember) {
533             // resync peer
534             final Member member = ((ReachableMember) message).member();
535             LOG.info("Member is reachable again: {}", member);
536
537             if (member.address().equals(clusterExtension.selfAddress())) {
538                 return;
539             }
540             final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(member.address().toString(), topologyId);
541             final String path = pathCreator.build();
542             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
543
544             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
545         } else if (message instanceof ActorIdentity) {
546             LOG.debug("Received ActorIdentity message", message);
547             final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(((ActorIdentity) message).correlationId().toString(), topologyId);
548             final String path = pathCreator.build();
549             if (((ActorIdentity) message).getRef() == null) {
550                 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
551                 final ActorRef self = TypedActor.context().self();
552                 final ActorContext context = TypedActor.context();
553                 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
554                     @Override
555                     public void run() {
556                         LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
557                         context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
558
559                     }
560                 }, system.dispatcher());
561                 return;
562             }
563             LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
564
565             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
566         } else if (message instanceof CustomIdentifyMessageReply) {
567
568             LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
569             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
570                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
571                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
572                 if (isMaster) {
573                     resyncPeer(peer);
574                 }
575             }
576         } else if (message instanceof CustomIdentifyMessage) {
577             LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
578             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
579                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
580                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
581                 if (isMaster) {
582                     resyncPeer(peer);
583                 }
584             }
585             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
586         }
587     }
588
589     private void resyncPeer(final TopologyManager peer) {
590         final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
591         final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
592
593         Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
594             @Override
595             public void onSuccess(Optional<Topology> result) {
596                 if (result.isPresent() && result.get().getNode() != null) {
597                     for (final Node node : result.get().getNode()) {
598                         final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(TopologyUtil.createTopologyNodePath(topologyId), node);
599                         peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
600                         // we dont care about the future from now on since we will be notified by the onConnected event
601                     }
602                 }
603             }
604
605             @Override
606             public void onFailure(Throwable t) {
607                 LOG.error("Unable to read from datastore");
608             }
609         });
610
611     }
612 }