Change onNodeUpdated to first cleanup previous state
[netconf.git] / opendaylight / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.util;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Random;
42 import java.util.Set;
43 import java.util.concurrent.TimeUnit;
44 import javax.annotation.Nonnull;
45 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
46 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
47 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.netconf.topology.RoleChangeStrategy;
50 import org.opendaylight.netconf.topology.StateAggregator;
51 import org.opendaylight.netconf.topology.TopologyManager;
52 import org.opendaylight.netconf.topology.TopologyManagerCallback;
53 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
54 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
55 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
56 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
61 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
62 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
63 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
64 import org.opendaylight.yangtools.yang.binding.DataObject;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71 import scala.concurrent.Future;
72 import scala.concurrent.duration.FiniteDuration;
73 import scala.concurrent.impl.Promise.DefaultPromise;
74
75 public final class BaseTopologyManager
76         implements TopologyManager {
77
78     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
79     private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
80
81     private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
82
83     private final ActorSystem system;
84     private final TypedActorExtension typedExtension;
85     private final Cluster clusterExtension;
86
87     private final BindingNormalizedNodeCodecRegistry codecRegistry;
88
89     private static final String PATH = "/user/";
90
91     private final DataBroker dataBroker;
92     private final RoleChangeStrategy roleChangeStrategy;
93     private final StateAggregator aggregator;
94
95     private final NodeWriter naSalNodeWriter;
96     private final String topologyId;
97     private final TopologyManagerCallback delegateTopologyHandler;
98     private final Set<NodeId> created = new HashSet<>();
99
100     private final Map<Address, TopologyManager> peers = new HashMap<>();
101     private TopologyManager masterPeer = null;
102     private final int id = new Random().nextInt();
103
104     private boolean isMaster;
105
106     public BaseTopologyManager(final ActorSystem system,
107                                final BindingNormalizedNodeCodecRegistry codecRegistry,
108                                final DataBroker dataBroker,
109                                final String topologyId,
110                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
111                                final StateAggregator aggregator,
112                                final NodeWriter naSalNodeWriter,
113                                final RoleChangeStrategy roleChangeStrategy) {
114         this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
115     }
116
117     public BaseTopologyManager(final ActorSystem system,
118                                final BindingNormalizedNodeCodecRegistry codecRegistry,
119                                final DataBroker dataBroker,
120                                final String topologyId,
121                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
122                                final StateAggregator aggregator,
123                                final NodeWriter naSalNodeWriter,
124                                final RoleChangeStrategy roleChangeStrategy,
125                                final boolean isMaster) {
126
127         this.system = system;
128         this.typedExtension = TypedActor.get(system);
129         this.clusterExtension = Cluster.get(system);
130         this.dataBroker = dataBroker;
131         this.topologyId = topologyId;
132         this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
133         this.aggregator = aggregator;
134         this.naSalNodeWriter = naSalNodeWriter;
135         this.roleChangeStrategy = roleChangeStrategy;
136         this.codecRegistry = codecRegistry;
137
138         // election has not yet happened
139         this.isMaster = isMaster;
140
141         this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
142
143         LOG.debug("Base manager started ", +id);
144     }
145
146     @Override
147     public void preStart() {
148         LOG.debug("preStart called");
149         // TODO change to enum, master/slave active/standby
150         roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
151         LOG.debug("candidate registered");
152         clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
153     }
154
155     @Override
156     public void postStop() {
157         LOG.debug("postStop called");
158         clusterExtension.leave(clusterExtension.selfAddress());
159         clusterExtension.unsubscribe(TypedActor.context().self());
160     }
161
162     @Override
163     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
164         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
165
166         if (created.contains(nodeId)) {
167             LOG.warn("Node{} already exists, triggering update..", nodeId);
168             return onNodeUpdated(nodeId, node);
169         }
170         created.add(nodeId);
171         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
172
173         if (isMaster) {
174
175             futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
176             // only master should call connect on peers and aggregate futures
177             for (TopologyManager topologyManager : peers.values()) {
178                 // convert binding into NormalizedNode for transfer
179                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
180
181                 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
182                 LOG.debug("Value {}", normalizedNodeEntry.getValue());
183
184                 // add a future into our futures that gets its completion status from the converted scala future
185                 final SettableFuture<Node> settableFuture = SettableFuture.create();
186                 futures.add(settableFuture);
187                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
188                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
189                     @Override
190                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
191                         if (failure != null) {
192                             settableFuture.setException(failure);
193                             return;
194                         }
195                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
196                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
197                         final Node value = (Node) fromNormalizedNode.getValue();
198
199                         settableFuture.set(value);
200                     }
201                 }, TypedActor.context().dispatcher());
202             }
203
204             final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
205             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
206                 @Override
207                 public void onSuccess(final Node result) {
208                     LOG.debug("Futures aggregated succesfully");
209                     naSalNodeWriter.init(nodeId, result);
210                 }
211
212                 @Override
213                 public void onFailure(final Throwable t) {
214                     // If the combined connection attempt failed, set the node to connection failed
215                     LOG.debug("Futures aggregation failed");
216                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
217                     // FIXME disconnect those which succeeded
218                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
219                 }
220             }, TypedActor.context().dispatcher());
221
222             //combine peer futures
223             return aggregatedFuture;
224         }
225
226         // trigger create on this slave
227         return delegateTopologyHandler.onNodeCreated(nodeId, node);
228     }
229
230     @Override
231     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
232         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
233
234         // Master needs to trigger onNodeUpdated on peers and combine results
235         if (isMaster) {
236             // first cleanup old node
237             final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
238             final SettableFuture<Node> createFuture = SettableFuture.create();
239             final TopologyManager selfProxy = TypedActor.self();
240             final ActorContext context = TypedActor.context();
241             Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
242                 @Override
243                 public void onSuccess(Void result) {
244                     LOG.warn("Delete part of update succesfull, triggering create");
245                     // trigger create on all nodes
246                     Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
247                         @Override
248                         public void onSuccess(Node result) {
249                             createFuture.set(result);
250                         }
251
252                         @Override
253                         public void onFailure(Throwable t) {
254                             createFuture.setException(t);
255                         }
256                     }, context.dispatcher());
257                 }
258
259                 @Override
260                 public void onFailure(Throwable t) {
261                     LOG.warn("Delete part of update failed, {}", t);
262                 }
263             }, context.dispatcher());
264             return createFuture;
265         }
266
267         // Trigger update on this slave
268         return delegateTopologyHandler.onNodeUpdated(nodeId, node);
269     }
270
271     private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
272         final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
273         return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
274     }
275
276     @Override
277     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
278         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
279         created.remove(nodeId);
280
281         // Master needs to trigger delete on peers and combine results
282         if (isMaster) {
283             futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
284             for (TopologyManager topologyManager : peers.values()) {
285                 // add a future into our futures that gets its completion status from the converted scala future
286                 final SettableFuture<Void> settableFuture = SettableFuture.create();
287                 futures.add(settableFuture);
288                 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
289                 scalaFuture.onComplete(new OnComplete<Void>() {
290                     @Override
291                     public void onComplete(Throwable failure, Void success) throws Throwable {
292                         if (failure != null) {
293                             settableFuture.setException(failure);
294                             return;
295                         }
296
297                         settableFuture.set(success);
298                     }
299                 }, TypedActor.context().dispatcher());
300             }
301
302             final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
303             Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
304                 @Override
305                 public void onSuccess(final Void result) {
306                     naSalNodeWriter.delete(nodeId);
307                 }
308
309                 @Override
310                 public void onFailure(final Throwable t) {
311                     // FIXME unable to disconnect all the connections, what do we do now ?
312                 }
313             });
314
315             return aggregatedFuture;
316         }
317
318         // Trigger delete
319         return delegateTopologyHandler.onNodeDeleted(nodeId);
320     }
321
322     @Nonnull
323     @Override
324     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
325         return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
326     }
327
328     @Override
329     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
330         isMaster = roleChangeDTO.isOwner();
331         delegateTopologyHandler.onRoleChanged(roleChangeDTO);
332         if (isMaster) {
333             LOG.debug("Node {} is master now", clusterExtension.selfAddress());
334             clusterExtension.join(clusterExtension.selfAddress());
335         }
336     }
337
338     @Override
339     public Future<Boolean> isMaster() {
340         return new DefaultPromise<Boolean>().success(isMaster).future();
341     }
342
343     @Override
344     public void notifyNodeStatusChange(final NodeId nodeId) {
345         LOG.debug("Connection status has changed on node {}", nodeId.getValue());
346         if (isMaster) {
347             // grab status from all peers and aggregate
348             final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
349             futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
350             // only master should call connect on peers and aggregate futures
351             for (TopologyManager topologyManager : peers.values()) {
352                 // add a future into our futures that gets its completion status from the converted scala future
353                 final SettableFuture<Node> settableFuture = SettableFuture.create();
354                 futures.add(settableFuture);
355                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
356                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
357                     @Override
358                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
359                         if (failure != null) {
360                             settableFuture.setException(failure);
361                             return;
362                         }
363                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
364                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
365                         final Node value = (Node) fromNormalizedNode.getValue();
366
367                         settableFuture.set(value);
368                     }
369                 }, TypedActor.context().dispatcher());
370             }
371
372             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
373             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
374                 @Override
375                 public void onSuccess(final Node result) {
376                     LOG.debug("Futures aggregated succesfully");
377                     naSalNodeWriter.update(nodeId, result);
378                 }
379
380                 @Override
381                 public void onFailure(final Throwable t) {
382                     // If the combined connection attempt failed, set the node to connection failed
383                     LOG.debug("Futures aggregation failed");
384                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
385                     // FIXME disconnect those which succeeded
386                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
387                 }
388             });
389             return;
390         }
391         LOG.debug("Not master, forwarding..");
392         for (final TopologyManager manager : peers.values()) {
393             // asynchronously find out which peer is master
394             final Future<Boolean> future = manager.isMaster();
395             future.onComplete(new OnComplete<Boolean>() {
396                 @Override
397                 public void onComplete(Throwable failure, Boolean success) throws Throwable {
398                     if (failure == null && success) {
399                         LOG.debug("Found master peer");
400                         // forward to master
401                         manager.notifyNodeStatusChange(nodeId);
402                         return;
403                     }
404                     if (failure != null) {
405                         LOG.debug("Retrieving master peer failed, {}", failure);
406                     }
407                 }
408             }, TypedActor.context().dispatcher());
409         }
410     }
411
412     @Override
413     public boolean hasAllPeersUp() {
414         LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
415         LOG.warn(clusterExtension.state().toString());
416         LOG.warn(peers.toString());
417         return peers.size() == 2;
418     }
419
420     @Override
421     public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
422         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
423                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
424         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
425         final Node value = (Node) fromNormalizedNode.getValue();
426
427         LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
428         final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
429         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
430         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
431             @Override
432             public void onSuccess(Node result) {
433                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
434                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
435             }
436
437             @Override
438             public void onFailure(Throwable t) {
439                 promise.failure(t);
440             }
441         });
442
443         return promise.future();
444     }
445
446     @Override
447     public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
448         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
449                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
450         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
451         final Node value = (Node) fromNormalizedNode.getValue();
452
453         LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
454
455         final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
456         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
457         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
458             @Override
459             public void onSuccess(Node result) {
460                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
461                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
462             }
463
464             @Override
465             public void onFailure(Throwable t) {
466                 promise.failure(t);
467             }
468         });
469         return promise.future();
470     }
471
472     @Override
473     public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
474         LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
475
476         final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
477         final DefaultPromise<Void> promise = new DefaultPromise<>();
478         Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
479             @Override
480             public void onSuccess(Void result) {
481                 promise.success(null);
482             }
483
484             @Override
485             public void onFailure(Throwable t) {
486                 promise.failure(t);
487             }
488         });
489
490         return promise.future();
491     }
492
493     public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
494         LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
495
496         final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
497         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
498         Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
499             @Override
500             public void onSuccess(Node result) {
501                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
502                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
503             }
504
505             @Override
506             public void onFailure(Throwable t) {
507                 promise.failure(t);
508             }
509         });
510         return promise.future();
511     }
512
513     @Override
514     public void onReceive(final Object message, final ActorRef actorRef) {
515         LOG.debug("message received {}", message);
516         if (message instanceof MemberUp) {
517             final Member member = ((MemberUp) message).member();
518             LOG.info("Member is Up: {}", member);
519             if (member.address().equals(clusterExtension.selfAddress())) {
520                 return;
521             }
522
523             final String path = member.address() + PATH + topologyId;
524             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
525
526             // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
527             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
528         } else if (message instanceof MemberExited) {
529             // remove peer
530             final Member member = ((MemberExited) message).member();
531             LOG.info("Member exited cluster: {}", member);
532             peers.remove(member.address());
533         } else if (message instanceof MemberRemoved) {
534             // remove peer
535             final Member member = ((MemberRemoved) message).member();
536             LOG.info("Member was removed from cluster: {}", member);
537             peers.remove(member.address());
538         } else if (message instanceof UnreachableMember) {
539             // remove peer
540             final Member member = ((UnreachableMember) message).member();
541             LOG.info("Member is unreachable: {}", member);
542             peers.remove(member.address());
543         } else if (message instanceof ReachableMember) {
544             // resync peer
545             final Member member = ((ReachableMember) message).member();
546             LOG.info("Member is reachable again: {}", member);
547
548             if (member.address().equals(clusterExtension.selfAddress())) {
549                 return;
550             }
551
552             final String path = member.address() + PATH + topologyId;
553             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
554
555             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
556         } else if (message instanceof ActorIdentity) {
557             LOG.debug("Received ActorIdentity message", message);
558             final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
559             if (((ActorIdentity) message).getRef() == null) {
560                 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
561                 final ActorRef self = TypedActor.context().self();
562                 final ActorContext context = TypedActor.context();
563                 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
564                     @Override
565                     public void run() {
566                         LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
567                         context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
568
569                     }
570                 }, system.dispatcher());
571                 return;
572             }
573             LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
574
575             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
576         } else if (message instanceof CustomIdentifyMessageReply) {
577
578             LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
579             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
580                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
581                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
582                 if (isMaster) {
583                     resyncPeer(peer);
584                 }
585             }
586         } else if (message instanceof CustomIdentifyMessage) {
587             LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
588             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
589                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
590                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
591                 if (isMaster) {
592                     resyncPeer(peer);
593                 }
594             }
595             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
596         }
597     }
598
599     private void resyncPeer(final TopologyManager peer) {
600         final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
601         final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
602
603         Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
604             @Override
605             public void onSuccess(Optional<Topology> result) {
606                 if (result.isPresent()) {
607                     for (final Node node : result.get().getNode()) {
608                         final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
609                         peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
610                         // we dont care about the future from now on since we will be notified by the onConnected event
611                     }
612                 }
613             }
614
615             @Override
616             public void onFailure(Throwable t) {
617                 LOG.error("Unable to read from datastore");
618             }
619         });
620
621     }
622 }