2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.util;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.actor.Address;
14 import akka.actor.TypedActor;
15 import akka.actor.TypedActorExtension;
16 import akka.actor.TypedProps;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.ClusterEvent.MemberEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.ReachableMember;
24 import akka.cluster.ClusterEvent.UnreachableMember;
25 import akka.cluster.Member;
26 import akka.dispatch.OnComplete;
27 import com.google.common.util.concurrent.FutureCallback;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.SettableFuture;
31 import java.util.ArrayList;
32 import java.util.HashMap;
34 import java.util.Map.Entry;
35 import java.util.Random;
36 import javax.annotation.Nonnull;
37 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
38 import org.opendaylight.netconf.topology.NodeManager;
39 import org.opendaylight.netconf.topology.RoleChangeStrategy;
40 import org.opendaylight.netconf.topology.StateAggregator;
41 import org.opendaylight.netconf.topology.TopologyManager;
42 import org.opendaylight.netconf.topology.TopologyManagerCallback;
43 import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
44 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
45 import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
46 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
51 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
52 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
53 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
54 import org.opendaylight.yangtools.yang.binding.DataObject;
55 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Future;
61 import scala.concurrent.impl.Promise.DefaultPromise;
63 public final class BaseTopologyManager
64 implements TopologyManager {
66 private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
68 private final ActorSystem system;
69 private final TypedActorExtension typedExtension;
70 private final Cluster clusterExtension;
72 private final BindingNormalizedNodeCodecRegistry codecRegistry;
74 private static final String PATH = "/user/";
76 private final DataBroker dataBroker;
77 private final RoleChangeStrategy roleChangeStrategy;
78 private final StateAggregator aggregator;
80 private final NodeWriter naSalNodeWriter;
81 private final String topologyId;
82 private final TopologyManagerCallback delegateTopologyHandler;
84 private final Map<NodeId, NodeManager> nodes = new HashMap<>();
85 private final Map<Address, TopologyManager> peers = new HashMap<>();
86 private TopologyManager masterPeer = null;
87 private final int id = new Random().nextInt();
89 private boolean isMaster;
91 public BaseTopologyManager(final ActorSystem system,
92 final BindingNormalizedNodeCodecRegistry codecRegistry,
93 final DataBroker dataBroker,
94 final String topologyId,
95 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
96 final StateAggregator aggregator,
97 final NodeWriter naSalNodeWriter,
98 final RoleChangeStrategy roleChangeStrategy) {
99 this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
102 public BaseTopologyManager(final ActorSystem system,
103 final BindingNormalizedNodeCodecRegistry codecRegistry,
104 final DataBroker dataBroker,
105 final String topologyId,
106 final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
107 final StateAggregator aggregator,
108 final NodeWriter naSalNodeWriter,
109 final RoleChangeStrategy roleChangeStrategy,
110 final boolean isMaster) {
112 this.system = system;
113 this.typedExtension = TypedActor.get(system);
114 this.clusterExtension = Cluster.get(system);
115 this.dataBroker = dataBroker;
116 this.topologyId = topologyId;
117 this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
118 this.aggregator = aggregator;
119 this.naSalNodeWriter = naSalNodeWriter;
120 this.roleChangeStrategy = roleChangeStrategy;
121 this.codecRegistry = codecRegistry;
123 // election has not yet happened
124 this.isMaster = isMaster;
126 LOG.debug("Base manager started ", +id);
130 public void preStart() {
131 LOG.debug("preStart called");
132 // TODO change to enum, master/slave active/standby
133 roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
134 LOG.debug("candidate registered");
135 clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
139 public void postStop() {
140 LOG.debug("postStop called");
141 clusterExtension.leave(clusterExtension.selfAddress());
142 clusterExtension.unsubscribe(TypedActor.context().self());
146 public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
147 LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
149 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
153 futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
154 // only master should call connect on peers and aggregate futures
155 for (TopologyManager topologyManager : peers.values()) {
156 // convert binding into NormalizedNode for transfer
157 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
159 LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
160 LOG.debug("Value {}", normalizedNodeEntry.getValue());
162 // add a future into our futures that gets its completion status from the converted scala future
163 final SettableFuture<Node> settableFuture = SettableFuture.create();
164 futures.add(settableFuture);
165 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
166 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
168 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
169 if (failure != null) {
170 settableFuture.setException(failure);
173 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
174 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
175 final Node value = (Node) fromNormalizedNode.getValue();
177 settableFuture.set(value);
179 }, TypedActor.context().dispatcher());
182 final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
183 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
185 public void onSuccess(final Node result) {
186 LOG.debug("Futures aggregated succesfully");
187 naSalNodeWriter.init(nodeId, result);
191 public void onFailure(final Throwable t) {
192 // If the combined connection attempt failed, set the node to connection failed
193 LOG.debug("Futures aggregation failed");
194 naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
195 // FIXME disconnect those which succeeded
196 // just issue a delete on delegateTopologyHandler that gets handled on lower level
198 }, TypedActor.context().dispatcher());
200 //combine peer futures
201 return aggregatedFuture;
204 // trigger create on this slave
205 return delegateTopologyHandler.onNodeCreated(nodeId, node);
209 public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
210 LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
212 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
214 // Master needs to trigger onNodeUpdated on peers and combine results
216 futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
217 for (TopologyManager topologyManager : peers.values()) {
218 // convert binding into NormalizedNode for transfer
219 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
221 // add a future into our futures that gets its completion status from the converted scala future
222 final SettableFuture<Node> settableFuture = SettableFuture.create();
223 futures.add(settableFuture);
224 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
225 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
227 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
228 if (failure != null) {
229 settableFuture.setException(failure);
232 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
233 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
234 final Node value = (Node) fromNormalizedNode.getValue();
236 settableFuture.set(value);
238 }, TypedActor.context().dispatcher());
241 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
242 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
244 public void onSuccess(final Node result) {
245 // FIXME make this (writing state data for nodes) optional and customizable
246 // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
247 naSalNodeWriter.update(nodeId, result);
251 public void onFailure(final Throwable t) {
252 // If the combined connection attempt failed, set the node to connection failed
253 naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
254 // FIXME disconnect those which succeeded
255 // just issue a delete on delegateTopologyHandler that gets handled on lower level
259 //combine peer futures
260 return aggregatedFuture;
263 // Trigger update on this slave
264 return delegateTopologyHandler.onNodeUpdated(nodeId, node);
267 private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
268 final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
269 return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
273 public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
274 final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
276 // Master needs to trigger delete on peers and combine results
278 futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
279 for (TopologyManager topologyManager : peers.values()) {
280 // add a future into our futures that gets its completion status from the converted scala future
281 final SettableFuture<Void> settableFuture = SettableFuture.create();
282 futures.add(settableFuture);
283 final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
284 scalaFuture.onComplete(new OnComplete<Void>() {
286 public void onComplete(Throwable failure, Void success) throws Throwable {
287 if (failure != null) {
288 settableFuture.setException(failure);
292 settableFuture.set(success);
294 }, TypedActor.context().dispatcher());
297 final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
298 Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
300 public void onSuccess(final Void result) {
301 naSalNodeWriter.delete(nodeId);
305 public void onFailure(final Throwable t) {
306 // FIXME unable to disconnect all the connections, what do we do now ?
310 return aggregatedFuture;
314 return delegateTopologyHandler.onNodeDeleted(nodeId);
319 public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
320 return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
324 public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
325 isMaster = roleChangeDTO.isOwner();
326 delegateTopologyHandler.onRoleChanged(roleChangeDTO);
328 LOG.debug("Node {} is master now", clusterExtension.selfAddress());
329 clusterExtension.join(clusterExtension.selfAddress());
334 public Future<Boolean> isMaster() {
335 return new DefaultPromise<Boolean>().success(isMaster).future();
339 public void notifyNodeStatusChange(final NodeId nodeId) {
340 LOG.debug("Connection status has changed on node {}", nodeId.getValue());
342 // grab status from all peers and aggregate
343 final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
344 futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
345 // only master should call connect on peers and aggregate futures
346 for (TopologyManager topologyManager : peers.values()) {
347 // add a future into our futures that gets its completion status from the converted scala future
348 final SettableFuture<Node> settableFuture = SettableFuture.create();
349 futures.add(settableFuture);
350 final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
351 scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
353 public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
354 if (failure != null) {
355 settableFuture.setException(failure);
358 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
359 codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
360 final Node value = (Node) fromNormalizedNode.getValue();
362 settableFuture.set(value);
364 }, TypedActor.context().dispatcher());
367 final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
368 Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
370 public void onSuccess(final Node result) {
371 LOG.debug("Futures aggregated succesfully");
372 naSalNodeWriter.update(nodeId, result);
376 public void onFailure(final Throwable t) {
377 // If the combined connection attempt failed, set the node to connection failed
378 LOG.debug("Futures aggregation failed");
379 naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, null));
380 // FIXME disconnect those which succeeded
381 // just issue a delete on delegateTopologyHandler that gets handled on lower level
386 LOG.debug("Not master, forwarding..");
387 for (final TopologyManager manager : peers.values()) {
388 // asynchronously find out which peer is master
389 final Future<Boolean> future = manager.isMaster();
390 future.onComplete(new OnComplete<Boolean>() {
392 public void onComplete(Throwable failure, Boolean success) throws Throwable {
393 if (failure == null && success) {
394 LOG.debug("Found master peer");
396 manager.notifyNodeStatusChange(nodeId);
399 if (failure != null) {
400 LOG.debug("Retrieving master peer failed, {}", failure);
403 }, TypedActor.context().dispatcher());
408 public boolean hasAllPeersUp() {
409 LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
410 LOG.warn(clusterExtension.state().toString());
411 LOG.warn(peers.toString());
412 return peers.size() == 2;
416 public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
417 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
418 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
419 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
420 final Node value = (Node) fromNormalizedNode.getValue();
422 LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
423 final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
424 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
425 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
427 public void onSuccess(Node result) {
428 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
429 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
433 public void onFailure(Throwable t) {
438 return promise.future();
442 public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
443 final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
444 codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
445 final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
446 final Node value = (Node) fromNormalizedNode.getValue();
448 LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
450 final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
451 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
452 Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
454 public void onSuccess(Node result) {
455 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
456 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
460 public void onFailure(Throwable t) {
464 return promise.future();
468 public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
469 LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
471 final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
472 final DefaultPromise<Void> promise = new DefaultPromise<>();
473 Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
475 public void onSuccess(Void result) {
476 promise.success(null);
480 public void onFailure(Throwable t) {
485 return promise.future();
488 public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
489 LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
491 final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
492 final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
493 Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
495 public void onSuccess(Node result) {
496 final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
497 promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
501 public void onFailure(Throwable t) {
505 return promise.future();
509 public void onReceive(final Object message, final ActorRef actorRef) {
510 LOG.debug("message received {}", message);
511 if (message instanceof MemberUp) {
512 final Member member = ((MemberUp) message).member();
513 LOG.info("Member is Up: {}", member);
514 if (member.address().equals(clusterExtension.selfAddress())) {
518 final String path = member.address() + PATH + topologyId;
519 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
521 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
522 } else if (message instanceof MemberExited) {
524 final Member member = ((MemberExited) message).member();
525 LOG.info("Member exited cluster: {}", member);
526 peers.remove(member.address());
527 } else if (message instanceof MemberRemoved) {
529 final Member member = ((MemberRemoved) message).member();
530 LOG.info("Member was removed from cluster: {}", member);
531 peers.remove(member.address());
532 } else if (message instanceof UnreachableMember) {
534 final Member member = ((UnreachableMember) message).member();
535 LOG.info("Member is unreachable: {}", member);
536 peers.remove(member.address());
537 } else if (message instanceof ReachableMember) {
539 final Member member = ((ReachableMember) message).member();
540 LOG.info("Member is reachable again: {}", member);
542 if (member.address().equals(clusterExtension.selfAddress())) {
546 final String path = member.address() + PATH + topologyId;
547 LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
549 clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
550 } else if (message instanceof CustomIdentifyMessageReply) {
551 LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
552 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
553 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
554 peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
556 } else if (message instanceof CustomIdentifyMessage) {
557 LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
558 if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
559 final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
560 peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
562 actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());