ca55bb0552ee5e5bf4d7ce41e56652f023ee5fec
[netconf.git] / opendaylight / netconf / abstract-topology / src / main / java / org / opendaylight / netconf / topology / util / BaseTopologyManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.util;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.Address;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.ClusterEvent.MemberEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.ReachableMember;
24 import akka.cluster.ClusterEvent.UnreachableMember;
25 import akka.cluster.Member;
26 import akka.dispatch.OnComplete;
27 import com.google.common.util.concurrent.FutureCallback;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.SettableFuture;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Random;
36 import javax.annotation.Nonnull;
37 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
38 import org.opendaylight.netconf.topology.NodeManager;
39 import org.opendaylight.netconf.topology.RoleChangeStrategy;
40 import org.opendaylight.netconf.topology.StateAggregator;
41 import org.opendaylight.netconf.topology.TopologyManager;
42 import org.opendaylight.netconf.topology.TopologyManagerCallback;
43 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
44 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
45 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
46 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
53 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
54 import org.opendaylight.yangtools.yang.binding.DataObject;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Future;
61 import scala.concurrent.impl.Promise.DefaultPromise;
62
63 public final class BaseTopologyManager
64         implements TopologyManager {
65
66     private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
67
68     private final ActorSystem system;
69     private final TypedActorExtension typedExtension;
70     private final Cluster clusterExtension;
71
72     private final BindingNormalizedNodeCodecRegistry codecRegistry;
73
74     private static final String PATH = "/user/";
75
76     private final DataBroker dataBroker;
77     private final RoleChangeStrategy roleChangeStrategy;
78     private final StateAggregator aggregator;
79
80     private final NodeWriter naSalNodeWriter;
81     private final String topologyId;
82     private final TopologyManagerCallback delegateTopologyHandler;
83
84     private final Map<NodeId, NodeManager> nodes = new HashMap<>();
85     private final Map<Address, TopologyManager> peers = new HashMap<>();
86     private TopologyManager masterPeer = null;
87     private final int id = new Random().nextInt();
88
89     private boolean isMaster;
90
91     public BaseTopologyManager(final ActorSystem system,
92                                final BindingNormalizedNodeCodecRegistry codecRegistry,
93                                final DataBroker dataBroker,
94                                final String topologyId,
95                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
96                                final StateAggregator aggregator,
97                                final NodeWriter naSalNodeWriter,
98                                final RoleChangeStrategy roleChangeStrategy) {
99         this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
100     }
101
102     public BaseTopologyManager(final ActorSystem system,
103                                final BindingNormalizedNodeCodecRegistry codecRegistry,
104                                final DataBroker dataBroker,
105                                final String topologyId,
106                                final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
107                                final StateAggregator aggregator,
108                                final NodeWriter naSalNodeWriter,
109                                final RoleChangeStrategy roleChangeStrategy,
110                                final boolean isMaster) {
111
112         this.system = system;
113         this.typedExtension = TypedActor.get(system);
114         this.clusterExtension = Cluster.get(system);
115         this.dataBroker = dataBroker;
116         this.topologyId = topologyId;
117         this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
118         this.aggregator = aggregator;
119         this.naSalNodeWriter = naSalNodeWriter;
120         this.roleChangeStrategy = roleChangeStrategy;
121         this.codecRegistry = codecRegistry;
122
123         // election has not yet happened
124         this.isMaster = isMaster;
125
126         LOG.debug("Base manager started ", +id);
127     }
128
129     @Override
130     public void preStart() {
131         LOG.debug("preStart called");
132         // TODO change to enum, master/slave active/standby
133         roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
134         LOG.debug("candidate registered");
135         clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
136     }
137
138     @Override
139     public void postStop() {
140         LOG.debug("postStop called");
141         clusterExtension.leave(clusterExtension.selfAddress());
142         clusterExtension.unsubscribe(TypedActor.context().self());
143     }
144
145     @Override
146     public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
147         LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
148
149         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
150
151         if (isMaster) {
152
153             futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
154             // only master should call connect on peers and aggregate futures
155             for (TopologyManager topologyManager : peers.values()) {
156                 // convert binding into NormalizedNode for transfer
157                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
158
159                 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
160                 LOG.debug("Value {}", normalizedNodeEntry.getValue());
161
162                 // add a future into our futures that gets its completion status from the converted scala future
163                 final SettableFuture<Node> settableFuture = SettableFuture.create();
164                 futures.add(settableFuture);
165                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
166                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
167                     @Override
168                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
169                         if (failure != null) {
170                             settableFuture.setException(failure);
171                             return;
172                         }
173                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
174                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
175                         final Node value = (Node) fromNormalizedNode.getValue();
176
177                         settableFuture.set(value);
178                     }
179                 }, TypedActor.context().dispatcher());
180             }
181
182             final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
183             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
184                 @Override
185                 public void onSuccess(final Node result) {
186                     LOG.debug("Futures aggregated succesfully");
187                     naSalNodeWriter.init(nodeId, result);
188                 }
189
190                 @Override
191                 public void onFailure(final Throwable t) {
192                     // If the combined connection attempt failed, set the node to connection failed
193                     LOG.debug("Futures aggregation failed");
194                     naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
195                     // FIXME disconnect those which succeeded
196                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
197                 }
198             }, TypedActor.context().dispatcher());
199
200             //combine peer futures
201             return aggregatedFuture;
202         }
203
204         // trigger create on this slave
205         return delegateTopologyHandler.onNodeCreated(nodeId, node);
206     }
207
208     @Override
209     public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
210         LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
211
212         final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
213
214         // Master needs to trigger onNodeUpdated on peers and combine results
215         if (isMaster) {
216             futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
217             for (TopologyManager topologyManager : peers.values()) {
218                 // convert binding into NormalizedNode for transfer
219                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
220
221                 // add a future into our futures that gets its completion status from the converted scala future
222                 final SettableFuture<Node> settableFuture = SettableFuture.create();
223                 futures.add(settableFuture);
224                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
225                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
226                     @Override
227                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
228                         if (failure != null) {
229                             settableFuture.setException(failure);
230                             return;
231                         }
232                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
233                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
234                         final Node value = (Node) fromNormalizedNode.getValue();
235
236                         settableFuture.set(value);
237                     }
238                 }, TypedActor.context().dispatcher());
239             }
240
241             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
242             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
243                 @Override
244                 public void onSuccess(final Node result) {
245                     // FIXME make this (writing state data for nodes) optional and customizable
246                     // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
247                     naSalNodeWriter.update(nodeId, result);
248                 }
249
250                 @Override
251                 public void onFailure(final Throwable t) {
252                     // If the combined connection attempt failed, set the node to connection failed
253                     naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
254                     // FIXME disconnect those which succeeded
255                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
256                 }
257             });
258
259             //combine peer futures
260             return aggregatedFuture;
261         }
262
263         // Trigger update on this slave
264         return delegateTopologyHandler.onNodeUpdated(nodeId, node);
265     }
266
267     private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
268         final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
269         return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
270     }
271
272     @Override
273     public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
274         final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
275
276         // Master needs to trigger delete on peers and combine results
277         if (isMaster) {
278             futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
279             for (TopologyManager topologyManager : peers.values()) {
280                 // add a future into our futures that gets its completion status from the converted scala future
281                 final SettableFuture<Void> settableFuture = SettableFuture.create();
282                 futures.add(settableFuture);
283                 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
284                 scalaFuture.onComplete(new OnComplete<Void>() {
285                     @Override
286                     public void onComplete(Throwable failure, Void success) throws Throwable {
287                         if (failure != null) {
288                             settableFuture.setException(failure);
289                             return;
290                         }
291
292                         settableFuture.set(success);
293                     }
294                 }, TypedActor.context().dispatcher());
295             }
296
297             final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
298             Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
299                 @Override
300                 public void onSuccess(final Void result) {
301                     naSalNodeWriter.delete(nodeId);
302                 }
303
304                 @Override
305                 public void onFailure(final Throwable t) {
306                     // FIXME unable to disconnect all the connections, what do we do now ?
307                 }
308             });
309
310             return aggregatedFuture;
311         }
312
313         // Trigger delete
314         return delegateTopologyHandler.onNodeDeleted(nodeId);
315     }
316
317     @Nonnull
318     @Override
319     public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
320         return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
321     }
322
323     @Override
324     public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
325         isMaster = roleChangeDTO.isOwner();
326         delegateTopologyHandler.onRoleChanged(roleChangeDTO);
327         if (isMaster) {
328             LOG.debug("Node {} is master now", clusterExtension.selfAddress());
329             clusterExtension.join(clusterExtension.selfAddress());
330         }
331     }
332
333     @Override
334     public Future<Boolean> isMaster() {
335         return new DefaultPromise<Boolean>().success(isMaster).future();
336     }
337
338     @Override
339     public void notifyNodeStatusChange(final NodeId nodeId) {
340         LOG.debug("Connection status has changed on node {}", nodeId.getValue());
341         if (isMaster) {
342             // grab status from all peers and aggregate
343             final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
344             futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
345             // only master should call connect on peers and aggregate futures
346             for (TopologyManager topologyManager : peers.values()) {
347                 // add a future into our futures that gets its completion status from the converted scala future
348                 final SettableFuture<Node> settableFuture = SettableFuture.create();
349                 futures.add(settableFuture);
350                 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
351                 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
352                     @Override
353                     public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
354                         if (failure != null) {
355                             settableFuture.setException(failure);
356                             return;
357                         }
358                         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
359                                 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
360                         final Node value = (Node) fromNormalizedNode.getValue();
361
362                         settableFuture.set(value);
363                     }
364                 }, TypedActor.context().dispatcher());
365             }
366
367             final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
368             Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
369                 @Override
370                 public void onSuccess(final Node result) {
371                     LOG.debug("Futures aggregated succesfully");
372                     naSalNodeWriter.update(nodeId, result);
373                 }
374
375                 @Override
376                 public void onFailure(final Throwable t) {
377                     // If the combined connection attempt failed, set the node to connection failed
378                     LOG.debug("Futures aggregation failed");
379                     naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, null));
380                     // FIXME disconnect those which succeeded
381                     // just issue a delete on delegateTopologyHandler that gets handled on lower level
382                 }
383             });
384             return;
385         }
386         LOG.debug("Not master, forwarding..");
387         for (final TopologyManager manager : peers.values()) {
388             // asynchronously find out which peer is master
389             final Future<Boolean> future = manager.isMaster();
390             future.onComplete(new OnComplete<Boolean>() {
391                 @Override
392                 public void onComplete(Throwable failure, Boolean success) throws Throwable {
393                     if (failure == null && success) {
394                         LOG.debug("Found master peer");
395                         // forward to master
396                         manager.notifyNodeStatusChange(nodeId);
397                         return;
398                     }
399                     if (failure != null) {
400                         LOG.debug("Retrieving master peer failed, {}", failure);
401                     }
402                 }
403             }, TypedActor.context().dispatcher());
404         }
405     }
406
407     @Override
408     public boolean hasAllPeersUp() {
409         LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
410         LOG.warn(clusterExtension.state().toString());
411         LOG.warn(peers.toString());
412         return peers.size() == 2;
413     }
414
415     @Override
416     public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
417         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
418                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
419         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
420         final Node value = (Node) fromNormalizedNode.getValue();
421
422         LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
423         final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
424         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
425         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
426             @Override
427             public void onSuccess(Node result) {
428                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
429                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
430             }
431
432             @Override
433             public void onFailure(Throwable t) {
434                 promise.failure(t);
435             }
436         });
437
438         return promise.future();
439     }
440
441     @Override
442     public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
443         final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
444                 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
445         final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
446         final Node value = (Node) fromNormalizedNode.getValue();
447
448         LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
449
450         final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
451         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
452         Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
453             @Override
454             public void onSuccess(Node result) {
455                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
456                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
457             }
458
459             @Override
460             public void onFailure(Throwable t) {
461                 promise.failure(t);
462             }
463         });
464         return promise.future();
465     }
466
467     @Override
468     public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
469         LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
470
471         final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
472         final DefaultPromise<Void> promise = new DefaultPromise<>();
473         Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
474             @Override
475             public void onSuccess(Void result) {
476                 promise.success(null);
477             }
478
479             @Override
480             public void onFailure(Throwable t) {
481                 promise.failure(t);
482             }
483         });
484
485         return promise.future();
486     }
487
488     public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
489         LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
490
491         final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
492         final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
493         Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
494             @Override
495             public void onSuccess(Node result) {
496                 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
497                 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
498             }
499
500             @Override
501             public void onFailure(Throwable t) {
502                 promise.failure(t);
503             }
504         });
505         return promise.future();
506     }
507
508     @Override
509     public void onReceive(final Object message, final ActorRef actorRef) {
510         LOG.debug("message received {}", message);
511         if (message instanceof MemberUp) {
512             final Member member = ((MemberUp) message).member();
513             LOG.info("Member is Up: {}", member);
514             if (member.address().equals(clusterExtension.selfAddress())) {
515                 return;
516             }
517
518             final String path = member.address() + PATH + topologyId;
519             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
520
521             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
522         } else if (message instanceof MemberExited) {
523             // remove peer
524             final Member member = ((MemberExited) message).member();
525             LOG.info("Member exited cluster: {}", member);
526             peers.remove(member.address());
527         } else if (message instanceof MemberRemoved) {
528             // remove peer
529             final Member member = ((MemberRemoved) message).member();
530             LOG.info("Member was removed from cluster: {}", member);
531             peers.remove(member.address());
532         } else if (message instanceof UnreachableMember) {
533             // remove peer
534             final Member member = ((UnreachableMember) message).member();
535             LOG.info("Member is unreachable: {}", member);
536             peers.remove(member.address());
537         } else if (message instanceof ReachableMember) {
538             // resync peer
539             final Member member = ((ReachableMember) message).member();
540             LOG.info("Member is reachable again: {}", member);
541
542             if (member.address().equals(clusterExtension.selfAddress())) {
543                 return;
544             }
545
546             final String path = member.address() + PATH + topologyId;
547             LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
548
549             clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
550         } else if (message instanceof CustomIdentifyMessageReply) {
551             LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
552             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
553                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
554                 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
555             }
556         } else if (message instanceof CustomIdentifyMessage) {
557             LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
558             if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
559                 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
560                 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
561             }
562             actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
563         }
564     }
565 }