77417acc2ae18d0eef8cddc027ceddd87a321ccd
[netconf.git] / opendaylight / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.util;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorIdentity;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Identify;
17 import akka.actor.TypedActor;
18 import akka.actor.TypedActorExtension;
19 import akka.actor.TypedProps;
20 import akka.cluster.Cluster;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberEvent;
23 import akka.cluster.ClusterEvent.MemberExited;
24 import akka.cluster.ClusterEvent.MemberRemoved;
25 import akka.cluster.ClusterEvent.MemberUp;
26 import akka.cluster.ClusterEvent.ReachableMember;
27 import akka.cluster.ClusterEvent.UnreachableMember;
28 import akka.cluster.Member;
29 import akka.dispatch.OnComplete;
30 import com.google.common.base.Optional;
31 import com.google.common.util.concurrent.CheckedFuture;
32 import com.google.common.util.concurrent.FutureCallback;
33 import com.google.common.util.concurrent.Futures;
34 import com.google.common.util.concurrent.ListenableFuture;
35 import com.google.common.util.concurrent.SettableFuture;
36 import java.util.ArrayList;
37 import java.util.HashMap;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Random;
41 import java.util.concurrent.TimeUnit;
42 import javax.annotation.Nonnull;
43 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
44 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
45 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
46 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
47 import org.opendaylight.netconf.topology.RoleChangeStrategy;
48 import org.opendaylight.netconf.topology.StateAggregator;
49 import org.opendaylight.netconf.topology.TopologyManager;
50 import org.opendaylight.netconf.topology.TopologyManagerCallback;
51 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
52 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
53 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
54 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
55 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
56 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
57 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
58 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
59 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
60 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
61 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
62 import org.opendaylight.yangtools.yang.binding.DataObject;
63 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
64 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71 import scala.concurrent.impl.Promise.DefaultPromise;
72
73 public final class BaseTopologyManager
74         implements TopologyManager {
75
76     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
77     private static final InstanceIdentifier<NetworkTopology> NETWORK_TOPOLOGY_PATH = InstanceIdentifier.builder(NetworkTopology.class).build();
78
79     private final KeyedInstanceIdentifier<Topology, TopologyKey> topologyListPath;
80
81     private final ActorSystem system;
82     private final TypedActorExtension typedExtension;
83     private final Cluster clusterExtension;
84
85     private final BindingNormalizedNodeCodecRegistry codecRegistry;
86
87     private static final String PATH = "/user/";
88
89     private final DataBroker dataBroker;
90     private final RoleChangeStrategy roleChangeStrategy;
91     private final StateAggregator aggregator;
92
93     private final NodeWriter naSalNodeWriter;
94     private final String topologyId;
95     private final TopologyManagerCallback delegateTopologyHandler;
96
97     private final Map<Address, TopologyManager> peers = new HashMap<>();
98     private TopologyManager masterPeer = null;
99     private final int id = new Random().nextInt();
100
101     private boolean isMaster;
102
103     public BaseTopologyManager(final ActorSystem system,
104                                final BindingNormalizedNodeCodecRegistry codecRegistry,
105                                final DataBroker dataBroker,
106                                final String topologyId,
107                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
108                                final StateAggregator aggregator,
109                                final NodeWriter naSalNodeWriter,
110                                final RoleChangeStrategy roleChangeStrategy) {
111         this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
112     }
113
114     public BaseTopologyManager(final ActorSystem system,
115                                final BindingNormalizedNodeCodecRegistry codecRegistry,
116                                final DataBroker dataBroker,
117                                final String topologyId,
118                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
119                                final StateAggregator aggregator,
120                                final NodeWriter naSalNodeWriter,
121                                final RoleChangeStrategy roleChangeStrategy,
122                                final boolean isMaster) {
123
124         this.system = system;
125         this.typedExtension = TypedActor.get(system);
126         this.clusterExtension = Cluster.get(system);
127         this.dataBroker = dataBroker;
128         this.topologyId = topologyId;
129         this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
130         this.aggregator = aggregator;
131         this.naSalNodeWriter = naSalNodeWriter;
132         this.roleChangeStrategy = roleChangeStrategy;
133         this.codecRegistry = codecRegistry;
134
135         // election has not yet happened
136         this.isMaster = isMaster;
137
138         this.topologyListPath = NETWORK_TOPOLOGY_PATH.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
139
140         LOG.debug("Base manager started ", +id);
141     }
142
143     @Override
144     public void preStart() {
145         LOG.debug("preStart called");
146         // TODO change to enum, master/slave active/standby
147         roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
148         LOG.debug("candidate registered");
149         clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
150     }
151
152     @Override
153     public void postStop() {
154         LOG.debug("postStop called");
155         clusterExtension.leave(clusterExtension.selfAddress());
156         clusterExtension.unsubscribe(TypedActor.context().self());
157     }
158
159     @Override
160     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
161         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
162
163         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
164
165         if (isMaster) {
166
167             futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
168             // only master should call connect on peers and aggregate futures
169             for (TopologyManager topologyManager : peers.values()) {
170                 // convert binding into NormalizedNode for transfer
171                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
172
173                 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
174                 LOG.debug("Value {}", normalizedNodeEntry.getValue());
175
176                 // add a future into our futures that gets its completion status from the converted scala future
177                 final SettableFuture<Node> settableFuture = SettableFuture.create();
178                 futures.add(settableFuture);
179                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
180                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
181                     @Override
182                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
183                         if (failure != null) {
184                             settableFuture.setException(failure);
185                             return;
186                         }
187                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
188                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
189                         final Node value = (Node) fromNormalizedNode.getValue();
190
191                         settableFuture.set(value);
192                     }
193                 }, TypedActor.context().dispatcher());
194             }
195
196             final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
197             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
198                 @Override
199                 public void onSuccess(final Node result) {
200                     LOG.debug("Futures aggregated succesfully");
201                     naSalNodeWriter.init(nodeId, result);
202                 }
203
204                 @Override
205                 public void onFailure(final Throwable t) {
206                     // If the combined connection attempt failed, set the node to connection failed
207                     LOG.debug("Futures aggregation failed");
208                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
209                     // FIXME disconnect those which succeeded
210                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
211                 }
212             }, TypedActor.context().dispatcher());
213
214             //combine peer futures
215             return aggregatedFuture;
216         }
217
218         // trigger create on this slave
219         return delegateTopologyHandler.onNodeCreated(nodeId, node);
220     }
221
222     @Override
223     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
224         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
225
226         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
227
228         // Master needs to trigger onNodeUpdated on peers and combine results
229         if (isMaster) {
230             futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
231             for (TopologyManager topologyManager : peers.values()) {
232                 // convert binding into NormalizedNode for transfer
233                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
234
235                 // add a future into our futures that gets its completion status from the converted scala future
236                 final SettableFuture<Node> settableFuture = SettableFuture.create();
237                 futures.add(settableFuture);
238                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
239                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
240                     @Override
241                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
242                         if (failure != null) {
243                             settableFuture.setException(failure);
244                             return;
245                         }
246                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
247                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
248                         final Node value = (Node) fromNormalizedNode.getValue();
249
250                         settableFuture.set(value);
251                     }
252                 }, TypedActor.context().dispatcher());
253             }
254
255             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
256             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
257                 @Override
258                 public void onSuccess(final Node result) {
259                     // FIXME make this (writing state data for nodes) optional and customizable
260                     // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
261                     naSalNodeWriter.update(nodeId, result);
262                 }
263
264                 @Override
265                 public void onFailure(final Throwable t) {
266                     // If the combined connection attempt failed, set the node to connection failed
267                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
268                     // FIXME disconnect those which succeeded
269                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
270                 }
271             });
272
273             //combine peer futures
274             return aggregatedFuture;
275         }
276
277         // Trigger update on this slave
278         return delegateTopologyHandler.onNodeUpdated(nodeId, node);
279     }
280
281     private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
282         final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
283         return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
284     }
285
286     @Override
287     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
288         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
289
290         // Master needs to trigger delete on peers and combine results
291         if (isMaster) {
292             futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
293             for (TopologyManager topologyManager : peers.values()) {
294                 // add a future into our futures that gets its completion status from the converted scala future
295                 final SettableFuture<Void> settableFuture = SettableFuture.create();
296                 futures.add(settableFuture);
297                 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
298                 scalaFuture.onComplete(new OnComplete<Void>() {
299                     @Override
300                     public void onComplete(Throwable failure, Void success) throws Throwable {
301                         if (failure != null) {
302                             settableFuture.setException(failure);
303                             return;
304                         }
305
306                         settableFuture.set(success);
307                     }
308                 }, TypedActor.context().dispatcher());
309             }
310
311             final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
312             Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
313                 @Override
314                 public void onSuccess(final Void result) {
315                     naSalNodeWriter.delete(nodeId);
316                 }
317
318                 @Override
319                 public void onFailure(final Throwable t) {
320                     // FIXME unable to disconnect all the connections, what do we do now ?
321                 }
322             });
323
324             return aggregatedFuture;
325         }
326
327         // Trigger delete
328         return delegateTopologyHandler.onNodeDeleted(nodeId);
329     }
330
331     @Nonnull
332     @Override
333     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
334         return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
335     }
336
337     @Override
338     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
339         isMaster = roleChangeDTO.isOwner();
340         delegateTopologyHandler.onRoleChanged(roleChangeDTO);
341         if (isMaster) {
342             LOG.debug("Node {} is master now", clusterExtension.selfAddress());
343             clusterExtension.join(clusterExtension.selfAddress());
344         }
345     }
346
347     @Override
348     public Future<Boolean> isMaster() {
349         return new DefaultPromise<Boolean>().success(isMaster).future();
350     }
351
352     @Override
353     public void notifyNodeStatusChange(final NodeId nodeId) {
354         LOG.debug("Connection status has changed on node {}", nodeId.getValue());
355         if (isMaster) {
356             // grab status from all peers and aggregate
357             final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
358             futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
359             // only master should call connect on peers and aggregate futures
360             for (TopologyManager topologyManager : peers.values()) {
361                 // add a future into our futures that gets its completion status from the converted scala future
362                 final SettableFuture<Node> settableFuture = SettableFuture.create();
363                 futures.add(settableFuture);
364                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
365                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
366                     @Override
367                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
368                         if (failure != null) {
369                             settableFuture.setException(failure);
370                             return;
371                         }
372                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
373                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
374                         final Node value = (Node) fromNormalizedNode.getValue();
375
376                         settableFuture.set(value);
377                     }
378                 }, TypedActor.context().dispatcher());
379             }
380
381             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
382             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
383                 @Override
384                 public void onSuccess(final Node result) {
385                     LOG.debug("Futures aggregated succesfully");
386                     naSalNodeWriter.update(nodeId, result);
387                 }
388
389                 @Override
390                 public void onFailure(final Throwable t) {
391                     // If the combined connection attempt failed, set the node to connection failed
392                     LOG.debug("Futures aggregation failed");
393                     naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, null));
394                     // FIXME disconnect those which succeeded
395                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
396                 }
397             });
398             return;
399         }
400         LOG.debug("Not master, forwarding..");
401         for (final TopologyManager manager : peers.values()) {
402             // asynchronously find out which peer is master
403             final Future<Boolean> future = manager.isMaster();
404             future.onComplete(new OnComplete<Boolean>() {
405                 @Override
406                 public void onComplete(Throwable failure, Boolean success) throws Throwable {
407                     if (failure == null && success) {
408                         LOG.debug("Found master peer");
409                         // forward to master
410                         manager.notifyNodeStatusChange(nodeId);
411                         return;
412                     }
413                     if (failure != null) {
414                         LOG.debug("Retrieving master peer failed, {}", failure);
415                     }
416                 }
417             }, TypedActor.context().dispatcher());
418         }
419     }
420
421     @Override
422     public boolean hasAllPeersUp() {
423         LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
424         LOG.warn(clusterExtension.state().toString());
425         LOG.warn(peers.toString());
426         return peers.size() == 2;
427     }
428
429     @Override
430     public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
431         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
432                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
433         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
434         final Node value = (Node) fromNormalizedNode.getValue();
435
436         LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
437         final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
438         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
439         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
440             @Override
441             public void onSuccess(Node result) {
442                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
443                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
444             }
445
446             @Override
447             public void onFailure(Throwable t) {
448                 promise.failure(t);
449             }
450         });
451
452         return promise.future();
453     }
454
455     @Override
456     public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
457         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
458                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
459         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
460         final Node value = (Node) fromNormalizedNode.getValue();
461
462         LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
463
464         final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
465         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
466         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
467             @Override
468             public void onSuccess(Node result) {
469                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
470                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
471             }
472
473             @Override
474             public void onFailure(Throwable t) {
475                 promise.failure(t);
476             }
477         });
478         return promise.future();
479     }
480
481     @Override
482     public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
483         LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
484
485         final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
486         final DefaultPromise<Void> promise = new DefaultPromise<>();
487         Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
488             @Override
489             public void onSuccess(Void result) {
490                 promise.success(null);
491             }
492
493             @Override
494             public void onFailure(Throwable t) {
495                 promise.failure(t);
496             }
497         });
498
499         return promise.future();
500     }
501
502     public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
503         LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
504
505         final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
506         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
507         Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
508             @Override
509             public void onSuccess(Node result) {
510                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
511                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
512             }
513
514             @Override
515             public void onFailure(Throwable t) {
516                 promise.failure(t);
517             }
518         });
519         return promise.future();
520     }
521
522     @Override
523     public void onReceive(final Object message, final ActorRef actorRef) {
524         LOG.debug("message received {}", message);
525         if (message instanceof MemberUp) {
526             final Member member = ((MemberUp) message).member();
527             LOG.info("Member is Up: {}", member);
528             if (member.address().equals(clusterExtension.selfAddress())) {
529                 return;
530             }
531
532             final String path = member.address() + PATH + topologyId;
533             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
534
535             // first send basic identify message in case our messages have not been loaded through osgi yet to prevent crashing akka.
536             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
537         } else if (message instanceof MemberExited) {
538             // remove peer
539             final Member member = ((MemberExited) message).member();
540             LOG.info("Member exited cluster: {}", member);
541             peers.remove(member.address());
542         } else if (message instanceof MemberRemoved) {
543             // remove peer
544             final Member member = ((MemberRemoved) message).member();
545             LOG.info("Member was removed from cluster: {}", member);
546             peers.remove(member.address());
547         } else if (message instanceof UnreachableMember) {
548             // remove peer
549             final Member member = ((UnreachableMember) message).member();
550             LOG.info("Member is unreachable: {}", member);
551             peers.remove(member.address());
552         } else if (message instanceof ReachableMember) {
553             // resync peer
554             final Member member = ((ReachableMember) message).member();
555             LOG.info("Member is reachable again: {}", member);
556
557             if (member.address().equals(clusterExtension.selfAddress())) {
558                 return;
559             }
560
561             final String path = member.address() + PATH + topologyId;
562             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
563
564             clusterExtension.system().actorSelection(path).tell(new Identify(member.address()), TypedActor.context().self());
565         } else if (message instanceof ActorIdentity) {
566             LOG.debug("Received ActorIdentity message", message);
567             final String path = ((ActorIdentity) message).correlationId() + PATH + topologyId;
568             if (((ActorIdentity) message).getRef() == null) {
569                 LOG.debug("ActorIdentity has null actor ref, retrying..", message);
570                 final ActorRef self = TypedActor.context().self();
571                 final ActorContext context = TypedActor.context();
572                 system.scheduler().scheduleOnce(new FiniteDuration(5, TimeUnit.SECONDS), new Runnable() {
573                     @Override
574                     public void run() {
575                         LOG.debug("Retrying identify message from master to node {} , full path {}", ((ActorIdentity) message).correlationId(), path);
576                         context.system().actorSelection(path).tell(new Identify(((ActorIdentity) message).correlationId()), self);
577
578                     }
579                 }, system.dispatcher());
580                 return;
581             }
582             LOG.debug("Actor at :{} is resolving topology actor for path {}, with a custom message", clusterExtension.selfAddress(), path);
583
584             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
585         } else if (message instanceof CustomIdentifyMessageReply) {
586
587             LOG.warn("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
588             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
589                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
590                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
591                 if (isMaster) {
592                     resyncPeer(peer);
593                 }
594             }
595         } else if (message instanceof CustomIdentifyMessage) {
596             LOG.warn("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
597             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
598                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
599                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
600                 if (isMaster) {
601                     resyncPeer(peer);
602                 }
603             }
604             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
605         }
606     }
607
608     private void resyncPeer(final TopologyManager peer) {
609         final ReadOnlyTransaction rTx = dataBroker.newReadOnlyTransaction();
610         final CheckedFuture<Optional<Topology>, ReadFailedException> read = rTx.read(LogicalDatastoreType.CONFIGURATION, topologyListPath);
611
612         Futures.addCallback(read, new FutureCallback<Optional<Topology>>() {
613             @Override
614             public void onSuccess(Optional<Topology> result) {
615                 if (result.isPresent()) {
616                     for (final Node node : result.get().getNode()) {
617                         final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
618                         peer.onRemoteNodeCreated(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
619                         // we dont care about the future from now on since we will be notified by the onConnected event
620                     }
621                 }
622             }
623
624             @Override
625             public void onFailure(Throwable t) {
626                 LOG.error("Unable to read from datastore");
627             }
628         });
629
630     }
631 }