</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster_2.11</artifactId>
- <version>2.3.14</version>
+ <artifactId>akka-cluster_${scala.major.version}</artifactId>
+ <version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-osgi_2.11</artifactId>
- <version>2.3.14</version>
+ <artifactId>akka-osgi_${scala.major.version}</artifactId>
+ <version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import akka.actor.ActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
+
+public class ExampleNodeManagerCallback implements NodeManagerCallback {
+
+ public ExampleNodeManagerCallback() {
+ }
+
+ @Nonnull
+ @Override public Node getInitialState(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ return new NodeBuilder().addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.Connecting).build()).build();
+ }
+
+ @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ return new NodeBuilder().addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.UnableToConnect).build()).build();
+ }
+
+ @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ return Futures.immediateFuture(new NodeBuilder().addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.Connected).build()).build());
+ }
+
+ @Nonnull @Override public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId,
+ @Nonnull final Node configNode) {
+ // update magic
+ return Futures.immediateFuture(new NodeBuilder().addAugmentation(NetconfNode.class,
+ new NetconfNodeBuilder().setConnectionStatus(NetconfNodeFields.ConnectionStatus.Connected).build()).build());
+ }
+
+ @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
+ return Futures.immediateFuture(null);
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void onReceive(Object message, ActorRef sender) {
+
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.opendaylight.netconf.topology.StateAggregator;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+
+/**
+ * Aggregator implementation expecting just a single state
+ */
+public final class ExampleSingleStateAggregator implements StateAggregator {
+
+ @Override public ListenableFuture<Node> combineCreateAttempts(final List<ListenableFuture<Node>> stateFutures) {
+ return getSingleFuture(stateFutures);
+ }
+
+ private <T> ListenableFuture<T> getSingleFuture(final List<ListenableFuture<T>> stateFutures) {
+ Preconditions.checkArgument(stateFutures.size() == 1, "Recieved multiple results, Single result is enforced here");
+ return stateFutures.get(0);
+ }
+
+ @Override public ListenableFuture<Node> combineUpdateAttempts(final List<ListenableFuture<Node>> stateFutures) {
+ return getSingleFuture(stateFutures);
+ }
+
+ @Override public ListenableFuture<Void> combineDeleteAttempts(final List<ListenableFuture<Void>> stateFutures) {
+ return getSingleFuture(stateFutures);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import akka.actor.ActorSystem;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
+import org.opendaylight.netconf.topology.util.BaseTopologyManager;
+
+public class ExampleTopology {
+
+ private static final String TOPOLOGY_NETCONF = "topology-netconf";
+ private BaseTopologyManager netconfNodeBaseTopologyManager;
+ private final DataBroker dataBroker;
+
+ public ExampleTopology(final EntityOwnershipService entityOwnershipService, final DataBroker dataBroker) {
+ final ActorSystem actorSystem = ActorSystem.create("netconf-cluster");
+
+ this.dataBroker = dataBroker;
+
+ final NodeManagerCallbackFactory nodeManagerCallbackFactory = new NodeManagerCallbackFactory() {
+ @Override
+ public NodeManagerCallback create(String nodeId, String topologyId, ActorSystem actorSystem) {
+ return new ExampleNodeManagerCallback();
+ }
+ };
+
+ final TopologyManagerCallbackFactory topologyManagerCallbackFactory = new TopologyManagerCallbackFactory() {
+ @Override
+ public TopologyManagerCallback create(ActorSystem actorSystem, String topologyId) {
+ return new ExampleTopologyManagerCallback(actorSystem, dataBroker, topologyId, nodeManagerCallbackFactory);
+ }
+ };
+
+// netconfNodeBaseTopologyManager = new BaseTopologyManager<>(dataBroker, TOPOLOGY_NETCONF,
+// topologyManagerCallbackFactory,
+// new SingleStateAggregator(),
+// new SalNodeWriter(dataBroker, TOPOLOGY_NETCONF),
+// new TopologyRoleChangeStrategy(dataBroker, entityOwnershipService, "netconf", TOPOLOGY_NETCONF));
+
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.util.BaseNodeManager.BaseNodeManagerBuilder;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.netconf.topology.util.NoopRoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.SalNodeWriter;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExampleTopologyManagerCallback implements TopologyManagerCallback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExampleTopologyManagerCallback.class);
+
+ private final DataBroker dataBroker;
+ private final ActorSystem actorSystem;
+ private boolean isMaster;
+
+ private final String topologyId;
+ private final NodeWriter naSalNodeWriter;
+ private final Map<NodeId, NodeManager> nodes = new HashMap<>();
+ private final NodeManagerCallbackFactory nodeHandlerFactory;
+
+ public ExampleTopologyManagerCallback(final ActorSystem actorSystem,
+ final DataBroker dataBroker,
+ final String topologyId,
+ final NodeManagerCallbackFactory nodeHandlerFactory) {
+ this(actorSystem, dataBroker, topologyId, nodeHandlerFactory, new SalNodeWriter(dataBroker, topologyId));
+ }
+
+ public ExampleTopologyManagerCallback(final ActorSystem actorSystem,
+ final DataBroker dataBroker,
+ final String topologyId,
+ final NodeManagerCallbackFactory nodeHandlerFactory,
+ final NodeWriter naSalNodeWriter) {
+ this(actorSystem, dataBroker, topologyId, nodeHandlerFactory, naSalNodeWriter, false);
+
+ }
+
+ public ExampleTopologyManagerCallback(final ActorSystem actorSystem,
+ final DataBroker dataBroker,
+ final String topologyId,
+ final NodeManagerCallbackFactory nodeHandlerFactory,
+ final NodeWriter naSalNodeWriter,
+ boolean isMaster) {
+ this.dataBroker = dataBroker;
+ this.actorSystem = actorSystem;
+ this.topologyId = topologyId;
+ this.nodeHandlerFactory = nodeHandlerFactory;
+ this.naSalNodeWriter = naSalNodeWriter;
+
+ this.isMaster = isMaster;
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeCreated(NodeId nodeId, Node node) {
+ // Init node admin and a writer for it
+
+ // TODO let end user code notify the baseNodeManager about state changes and handle them here on topology level
+ final NodeManager naBaseNodeManager =
+ createNodeManager(nodeId);
+
+ nodes.put(nodeId, naBaseNodeManager);
+
+ // Set initial state ? in every peer or just master ? TODO
+ if (isMaster) {
+ naSalNodeWriter.init(nodeId, naBaseNodeManager.getInitialState(nodeId, node));
+ }
+
+ // trigger connect on this node
+ return naBaseNodeManager.onNodeCreated(nodeId, node);
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
+ // Set initial state
+ naSalNodeWriter.init(nodeId, nodes.get(nodeId).getInitialState(nodeId, node));
+
+ // Trigger onNodeUpdated only on this node
+ return nodes.get(nodeId).onNodeUpdated(nodeId, node);
+ }
+
+ @Override
+ public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
+ // Trigger delete only on this node
+ final ListenableFuture<Void> future = nodes.get(nodeId).onNodeDeleted(nodeId);
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ // remove proxy from node list and stop the actor
+ final NodeManager remove = nodes.remove(nodeId);
+ TypedActor.get(actorSystem).stop(remove);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // NOOP will be handled on higher level
+ }
+ });
+ return future;
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ return nodes.get(nodeId).getCurrentStatusForNode(nodeId);
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+ isMaster = roleChangeDTO.isOwner();
+ // our post-election logic
+ }
+
+ private NodeManager createNodeManager(NodeId nodeId) {
+ return new BaseNodeManagerBuilder().setNodeId(nodeId.getValue())
+ .setActorContext(TypedActor.context())
+ .setDelegateFactory(nodeHandlerFactory)
+ .setRoleChangeStrategy(new NoopRoleChangeStrategy())
+ .setTopologyId(topologyId)
+ .build();
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.example;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import javax.annotation.Nonnull;
+import org.opendaylight.netconf.topology.util.NodeWriter;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingSalNodeWriter implements NodeWriter{
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoggingSalNodeWriter.class);
+
+ private final ArrayList<NodeWriter> delegates;
+
+ public LoggingSalNodeWriter(final NodeWriter... delegates) {
+ this.delegates = new ArrayList<>(Arrays.asList(delegates));
+ }
+
+ @Override
+ public void init(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
+ LOG.warn("Init recieved");
+ LOG.warn("NodeId: {}", id.getValue());
+ LOG.warn("Node: {}", operationalDataNode);
+ for (final NodeWriter delegate : delegates) {
+ delegate.init(id, operationalDataNode);
+ }
+ }
+
+ @Override
+ public void update(@Nonnull NodeId id, @Nonnull Node operationalDataNode) {
+ LOG.warn("Update recieved");
+ LOG.warn("NodeId: {}", id.getValue());
+ LOG.warn("Node: {}", operationalDataNode);
+ for (final NodeWriter delegate : delegates) {
+ delegate.update(id, operationalDataNode);
+ }
+ }
+
+ @Override
+ public void delete(@Nonnull NodeId id) {
+ LOG.warn("Delete recieved");
+ LOG.warn("NodeId: {}", id.getValue());
+ for (final NodeWriter delegate : delegates) {
+ delegate.delete(id);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.NodeManagerCallback;
+import org.opendaylight.netconf.topology.NodeManagerCallback.NodeManagerCallbackFactory;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+public final class BaseNodeManager implements NodeManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseNodeManager.class);
+
+ private final String nodeId;
+ private final String topologyId;
+ private final ActorSystem actorSystem;
+
+ private boolean isMaster;
+ private NodeManagerCallback delegate;
+
+ private BaseNodeManager(final String nodeId,
+ final String topologyId,
+ final ActorSystem actorSystem,
+ final NodeManagerCallbackFactory delegateFactory,
+ final RoleChangeStrategy roleChangeStrategy) {
+ LOG.debug("Creating BaseNodeManager, id: {}, {}", topologyId, nodeId );
+ this.nodeId = nodeId;
+ this.topologyId = topologyId;
+ this.actorSystem = actorSystem;
+ this.delegate = delegateFactory.create(nodeId, topologyId, actorSystem);
+ // if we want to override the place election happens,
+ // we need to override this with noop election strategy and implement election in callback
+ // cannot leak "this" here! have to use TypedActor.self()
+ roleChangeStrategy.registerRoleCandidate((NodeManager) TypedActor.self());
+ }
+
+ @Nonnull @Override public Node getInitialState(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+ LOG.trace("Retrieving Node {} initial state", nodeId);
+ return delegate.getInitialState(nodeId, configNode);
+ }
+
+ @Nonnull @Override public Node getFailedState(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+ LOG.trace("Retrieving Node {} failed state", nodeId);
+ return delegate.getFailedState(nodeId, configNode);
+ }
+
+ @Nonnull @Override public ListenableFuture<Node> onNodeCreated(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+ LOG.debug("Creating Node {}, with configuration: {}", nodeId.getValue(), configNode);
+ return delegate.onNodeCreated(nodeId, configNode);
+ }
+
+ @Nonnull @Override public ListenableFuture<Node> onNodeUpdated(@Nonnull final NodeId nodeId, @Nonnull final Node configNode) {
+ LOG.debug("Updating Node {}, with configuration: {}", nodeId.getValue(), configNode);
+ return delegate.onNodeUpdated(nodeId, configNode);
+ }
+
+ @Nonnull @Override public ListenableFuture<Void> onNodeDeleted(@Nonnull final NodeId nodeId) {
+ LOG.debug("Deleting Node {}", nodeId.getValue());
+ return delegate.onNodeDeleted(nodeId);
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull NodeId nodeId) {
+ LOG.debug("Getting current status for node: {}", nodeId.getValue());
+ return delegate.getCurrentStatusForNode(nodeId);
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+ LOG.debug("Node {} role has changed from: {} to {}", nodeId,
+ (roleChangeDTO.wasOwner() ? "master" : "slave"),
+ (roleChangeDTO.isOwner() ? "master" : "slave"));
+
+ isMaster = roleChangeDTO.isOwner();
+ delegate.onRoleChanged(roleChangeDTO);
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+
+ }
+
+ @Override
+ public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
+ return null;
+ }
+
+ @Override
+ public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
+ return null;
+ }
+
+ @Override
+ public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
+ return null;
+ }
+
+ /**
+ * Builder of BaseNodeManager instances that are proxied as TypedActors
+ */
+ public static class BaseNodeManagerBuilder {
+ private String nodeId;
+ private String topologyId;
+ private NodeManagerCallbackFactory delegateFactory;
+ private RoleChangeStrategy roleChangeStrategy;
+ private ActorContext actorContext;
+
+
+ public BaseNodeManagerBuilder setNodeId(final String nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
+
+ public BaseNodeManagerBuilder setTopologyId(final String topologyId) {
+ this.topologyId = topologyId;
+ return this;
+ }
+
+ public BaseNodeManagerBuilder setDelegateFactory(final NodeManagerCallbackFactory delegateFactory) {
+ this.delegateFactory = delegateFactory;
+ return this;
+ }
+
+ public BaseNodeManagerBuilder setRoleChangeStrategy(final RoleChangeStrategy roleChangeStrategy) {
+ this.roleChangeStrategy = roleChangeStrategy;
+ return this;
+ }
+
+ public BaseNodeManagerBuilder setActorContext(final ActorContext actorContext) {
+ this.actorContext = actorContext;
+ return this;
+ }
+
+ public NodeManager build() {
+ Preconditions.checkNotNull(nodeId);
+ Preconditions.checkNotNull(topologyId);
+ Preconditions.checkNotNull(delegateFactory);
+ Preconditions.checkNotNull(roleChangeStrategy);
+ Preconditions.checkNotNull(actorContext);
+ LOG.debug("Creating typed actor with id: {}", nodeId);
+
+ return TypedActor.get(actorContext).typedActorOf(new TypedProps<>(NodeManager.class, new Creator<BaseNodeManager>() {
+ @Override
+ public BaseNodeManager create() throws Exception {
+ return new BaseNodeManager(nodeId, topologyId, actorContext.system(), delegateFactory, roleChangeStrategy);
+ }
+ }), nodeId);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.TypedActor;
+import akka.actor.TypedActorExtension;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberEvent;
+import akka.cluster.ClusterEvent.MemberExited;
+import akka.cluster.ClusterEvent.MemberRemoved;
+import akka.cluster.ClusterEvent.MemberUp;
+import akka.cluster.ClusterEvent.ReachableMember;
+import akka.cluster.ClusterEvent.UnreachableMember;
+import akka.cluster.Member;
+import akka.dispatch.OnComplete;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.netconf.topology.NodeManager;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.StateAggregator;
+import org.opendaylight.netconf.topology.TopologyManager;
+import org.opendaylight.netconf.topology.TopologyManagerCallback;
+import org.opendaylight.netconf.topology.TopologyManagerCallback.TopologyManagerCallbackFactory;
+import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessage;
+import org.opendaylight.netconf.topology.util.messages.CustomIdentifyMessageReply;
+import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+public final class BaseTopologyManager
+ implements TopologyManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseTopologyManager.class);
+
+ private final ActorSystem system;
+ private final TypedActorExtension typedExtension;
+ private final Cluster clusterExtension;
+
+ private final BindingNormalizedNodeCodecRegistry codecRegistry;
+
+ private static final String PATH = "/user/";
+
+ private final DataBroker dataBroker;
+ private final RoleChangeStrategy roleChangeStrategy;
+ private final StateAggregator aggregator;
+
+ private final NodeWriter naSalNodeWriter;
+ private final String topologyId;
+ private final TopologyManagerCallback delegateTopologyHandler;
+
+ private final Map<NodeId, NodeManager> nodes = new HashMap<>();
+ private final Map<Address, TopologyManager> peers = new HashMap<>();
+ private TopologyManager masterPeer = null;
+ private final int id = new Random().nextInt();
+
+ private boolean isMaster;
+
+ public BaseTopologyManager(final ActorSystem system,
+ final BindingNormalizedNodeCodecRegistry codecRegistry,
+ final DataBroker dataBroker,
+ final String topologyId,
+ final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
+ final StateAggregator aggregator,
+ final NodeWriter naSalNodeWriter,
+ final RoleChangeStrategy roleChangeStrategy) {
+ this(system, codecRegistry, dataBroker, topologyId, topologyManagerCallbackFactory, aggregator, naSalNodeWriter, roleChangeStrategy, false);
+ }
+
+ public BaseTopologyManager(final ActorSystem system,
+ final BindingNormalizedNodeCodecRegistry codecRegistry,
+ final DataBroker dataBroker,
+ final String topologyId,
+ final TopologyManagerCallbackFactory topologyManagerCallbackFactory,
+ final StateAggregator aggregator,
+ final NodeWriter naSalNodeWriter,
+ final RoleChangeStrategy roleChangeStrategy,
+ final boolean isMaster) {
+
+ this.system = system;
+ this.typedExtension = TypedActor.get(system);
+ this.clusterExtension = Cluster.get(system);
+ this.dataBroker = dataBroker;
+ this.topologyId = topologyId;
+ this.delegateTopologyHandler = topologyManagerCallbackFactory.create(system, topologyId);
+ this.aggregator = aggregator;
+ this.naSalNodeWriter = naSalNodeWriter;
+ this.roleChangeStrategy = roleChangeStrategy;
+ this.codecRegistry = codecRegistry;
+
+ // election has not yet happened
+ this.isMaster = isMaster;
+
+ LOG.debug("Base manager started ", +id);
+ }
+
+ @Override
+ public void preStart() {
+ LOG.debug("preStart called");
+ // TODO change to enum, master/slave active/standby
+ roleChangeStrategy.registerRoleCandidate(TypedActor.<BaseTopologyManager>self());
+ LOG.debug("candidate registered");
+ clusterExtension.subscribe(TypedActor.context().self(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
+ }
+
+ @Override
+ public void postStop() {
+ LOG.debug("postStop called");
+ clusterExtension.leave(clusterExtension.selfAddress());
+ clusterExtension.unsubscribe(TypedActor.context().self());
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
+ LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
+
+ final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
+
+ if (isMaster) {
+
+ futures.add(delegateTopologyHandler.onNodeCreated(nodeId, node));
+ // only master should call connect on peers and aggregate futures
+ for (TopologyManager topologyManager : peers.values()) {
+ // convert binding into NormalizedNode for transfer
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+
+ LOG.debug("YangInstanceIdentifier {}", normalizedNodeEntry.getKey());
+ LOG.debug("Value {}", normalizedNodeEntry.getValue());
+
+ // add a future into our futures that gets its completion status from the converted scala future
+ final SettableFuture<Node> settableFuture = SettableFuture.create();
+ futures.add(settableFuture);
+ final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeCreated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
+ scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
+ @Override
+ public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
+ if (failure != null) {
+ settableFuture.setException(failure);
+ return;
+ }
+ final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+ codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
+ final Node value = (Node) fromNormalizedNode.getValue();
+
+ settableFuture.set(value);
+ }
+ }, TypedActor.context().dispatcher());
+ }
+
+ final ListenableFuture<Node> aggregatedFuture = aggregator.combineCreateAttempts(futures);
+ Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(final Node result) {
+ LOG.debug("Futures aggregated succesfully");
+ naSalNodeWriter.init(nodeId, result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // If the combined connection attempt failed, set the node to connection failed
+ LOG.debug("Futures aggregation failed");
+ naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
+ // FIXME disconnect those which succeeded
+ // just issue a delete on delegateTopologyHandler that gets handled on lower level
+ }
+ }, TypedActor.context().dispatcher());
+
+ //combine peer futures
+ return aggregatedFuture;
+ }
+
+ // trigger create on this slave
+ return delegateTopologyHandler.onNodeCreated(nodeId, node);
+ }
+
+ @Override
+ public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
+ LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
+
+ final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
+
+ // Master needs to trigger onNodeUpdated on peers and combine results
+ if (isMaster) {
+ futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
+ for (TopologyManager topologyManager : peers.values()) {
+ // convert binding into NormalizedNode for transfer
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
+
+ // add a future into our futures that gets its completion status from the converted scala future
+ final SettableFuture<Node> settableFuture = SettableFuture.create();
+ futures.add(settableFuture);
+ final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
+ scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
+ @Override
+ public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
+ if (failure != null) {
+ settableFuture.setException(failure);
+ return;
+ }
+ final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+ codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
+ final Node value = (Node) fromNormalizedNode.getValue();
+
+ settableFuture.set(value);
+ }
+ }, TypedActor.context().dispatcher());
+ }
+
+ final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
+ Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(final Node result) {
+ // FIXME make this (writing state data for nodes) optional and customizable
+ // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
+ naSalNodeWriter.update(nodeId, result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // If the combined connection attempt failed, set the node to connection failed
+ naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, node));
+ // FIXME disconnect those which succeeded
+ // just issue a delete on delegateTopologyHandler that gets handled on lower level
+ }
+ });
+
+ //combine peer futures
+ return aggregatedFuture;
+ }
+
+ // Trigger update on this slave
+ return delegateTopologyHandler.onNodeUpdated(nodeId, node);
+ }
+
+ private static InstanceIdentifier<Node> getNodeIid(final String topologyId) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+ return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId))).child(Node.class);
+ }
+
+ @Override
+ public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
+ final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+
+ // Master needs to trigger delete on peers and combine results
+ if (isMaster) {
+ futures.add(delegateTopologyHandler.onNodeDeleted(nodeId));
+ for (TopologyManager topologyManager : peers.values()) {
+ // add a future into our futures that gets its completion status from the converted scala future
+ final SettableFuture<Void> settableFuture = SettableFuture.create();
+ futures.add(settableFuture);
+ final Future<Void> scalaFuture = topologyManager.onRemoteNodeDeleted(nodeId);
+ scalaFuture.onComplete(new OnComplete<Void>() {
+ @Override
+ public void onComplete(Throwable failure, Void success) throws Throwable {
+ if (failure != null) {
+ settableFuture.setException(failure);
+ return;
+ }
+
+ settableFuture.set(success);
+ }
+ }, TypedActor.context().dispatcher());
+ }
+
+ final ListenableFuture<Void> aggregatedFuture = aggregator.combineDeleteAttempts(futures);
+ Futures.addCallback(aggregatedFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ naSalNodeWriter.delete(nodeId);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // FIXME unable to disconnect all the connections, what do we do now ?
+ }
+ });
+
+ return aggregatedFuture;
+ }
+
+ // Trigger delete
+ return delegateTopologyHandler.onNodeDeleted(nodeId);
+ }
+
+ @Nonnull
+ @Override
+ public ListenableFuture<Node> getCurrentStatusForNode(@Nonnull final NodeId nodeId) {
+ return delegateTopologyHandler.getCurrentStatusForNode(nodeId);
+ }
+
+ @Override
+ public void onRoleChanged(final RoleChangeDTO roleChangeDTO) {
+ isMaster = roleChangeDTO.isOwner();
+ delegateTopologyHandler.onRoleChanged(roleChangeDTO);
+ if (isMaster) {
+ LOG.debug("Node {} is master now", clusterExtension.selfAddress());
+ clusterExtension.join(clusterExtension.selfAddress());
+ }
+ }
+
+ @Override
+ public Future<Boolean> isMaster() {
+ return new DefaultPromise<Boolean>().success(isMaster).future();
+ }
+
+ @Override
+ public void notifyNodeStatusChange(final NodeId nodeId) {
+ LOG.debug("Connection status has changed on node {}", nodeId.getValue());
+ if (isMaster) {
+ // grab status from all peers and aggregate
+ final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
+ futures.add(delegateTopologyHandler.getCurrentStatusForNode(nodeId));
+ // only master should call connect on peers and aggregate futures
+ for (TopologyManager topologyManager : peers.values()) {
+ // add a future into our futures that gets its completion status from the converted scala future
+ final SettableFuture<Node> settableFuture = SettableFuture.create();
+ futures.add(settableFuture);
+ final Future<NormalizedNodeMessage> scalaFuture = topologyManager.remoteGetCurrentStatusForNode(nodeId);
+ scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
+ @Override
+ public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
+ if (failure != null) {
+ settableFuture.setException(failure);
+ return;
+ }
+ final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+ codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
+ final Node value = (Node) fromNormalizedNode.getValue();
+
+ settableFuture.set(value);
+ }
+ }, TypedActor.context().dispatcher());
+ }
+
+ final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
+ Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(final Node result) {
+ LOG.debug("Futures aggregated succesfully");
+ naSalNodeWriter.update(nodeId, result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ // If the combined connection attempt failed, set the node to connection failed
+ LOG.debug("Futures aggregation failed");
+ naSalNodeWriter.update(nodeId, nodes.get(nodeId).getFailedState(nodeId, null));
+ // FIXME disconnect those which succeeded
+ // just issue a delete on delegateTopologyHandler that gets handled on lower level
+ }
+ });
+ return;
+ }
+ LOG.debug("Not master, forwarding..");
+ for (final TopologyManager manager : peers.values()) {
+ // asynchronously find out which peer is master
+ final Future<Boolean> future = manager.isMaster();
+ future.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean success) throws Throwable {
+ if (failure == null && success) {
+ LOG.debug("Found master peer");
+ // forward to master
+ manager.notifyNodeStatusChange(nodeId);
+ return;
+ }
+ if (failure != null) {
+ LOG.debug("Retrieving master peer failed, {}", failure);
+ }
+ }
+ }, TypedActor.context().dispatcher());
+ }
+ }
+
+ @Override
+ public boolean hasAllPeersUp() {
+ LOG.debug("Peers needed: {} Peers up: {}", 2, peers.size());
+ LOG.warn(clusterExtension.state().toString());
+ LOG.warn(peers.toString());
+ return peers.size() == 2;
+ }
+
+ @Override
+ public Future<NormalizedNodeMessage> onRemoteNodeCreated(final NormalizedNodeMessage message) {
+ final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+ codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
+ final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
+ final Node value = (Node) fromNormalizedNode.getValue();
+
+ LOG.debug("TopologyManager({}) onRemoteNodeCreated received, nodeid: {}", value.getNodeId(), value);
+ final ListenableFuture<Node> nodeListenableFuture = onNodeCreated(value.getNodeId(), value);
+ final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
+ Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
+ promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+
+ return promise.future();
+ }
+
+ @Override
+ public Future<NormalizedNodeMessage> onRemoteNodeUpdated(final NormalizedNodeMessage message) {
+ final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
+ codecRegistry.fromNormalizedNode(message.getIdentifier(), message.getNode());
+ final InstanceIdentifier<Node> iid = (InstanceIdentifier<Node>) fromNormalizedNode.getKey();
+ final Node value = (Node) fromNormalizedNode.getValue();
+
+ LOG.debug("TopologyManager({}) onRemoteNodeUpdated received, nodeid: {}", id, value.getNodeId());
+
+ final ListenableFuture<Node> nodeListenableFuture = onNodeUpdated(value.getNodeId(), value);
+ final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
+ Futures.addCallback(nodeListenableFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(iid, result);
+ promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public Future<Void> onRemoteNodeDeleted(final NodeId nodeId) {
+ LOG.debug("TopologyManager({}) onRemoteNodeDeleted received, nodeid: {}", id, nodeId.getValue());
+
+ final ListenableFuture<Void> listenableFuture = onNodeDeleted(nodeId);
+ final DefaultPromise<Void> promise = new DefaultPromise<>();
+ Futures.addCallback(listenableFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ promise.success(null);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+
+ return promise.future();
+ }
+
+ public Future<NormalizedNodeMessage> remoteGetCurrentStatusForNode(final NodeId nodeId) {
+ LOG.debug("TopologyManager({}) remoteGetCurrentStatusForNode received, nodeid: {}", id, nodeId.getValue());
+
+ final ListenableFuture<Node> listenableFuture = getCurrentStatusForNode(nodeId);
+ final DefaultPromise<NormalizedNodeMessage> promise = new DefaultPromise<>();
+ Futures.addCallback(listenableFuture, new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), result);
+ promise.success(new NormalizedNodeMessage(entry.getKey(), entry.getValue()));
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ promise.failure(t);
+ }
+ });
+ return promise.future();
+ }
+
+ @Override
+ public void onReceive(final Object message, final ActorRef actorRef) {
+ LOG.debug("message received {}", message);
+ if (message instanceof MemberUp) {
+ final Member member = ((MemberUp) message).member();
+ LOG.info("Member is Up: {}", member);
+ if (member.address().equals(clusterExtension.selfAddress())) {
+ return;
+ }
+
+ final String path = member.address() + PATH + topologyId;
+ LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
+
+ clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+ } else if (message instanceof MemberExited) {
+ // remove peer
+ final Member member = ((MemberExited) message).member();
+ LOG.info("Member exited cluster: {}", member);
+ peers.remove(member.address());
+ } else if (message instanceof MemberRemoved) {
+ // remove peer
+ final Member member = ((MemberRemoved) message).member();
+ LOG.info("Member was removed from cluster: {}", member);
+ peers.remove(member.address());
+ } else if (message instanceof UnreachableMember) {
+ // remove peer
+ final Member member = ((UnreachableMember) message).member();
+ LOG.info("Member is unreachable: {}", member);
+ peers.remove(member.address());
+ } else if (message instanceof ReachableMember) {
+ // resync peer
+ final Member member = ((ReachableMember) message).member();
+ LOG.info("Member is reachable again: {}", member);
+
+ if (member.address().equals(clusterExtension.selfAddress())) {
+ return;
+ }
+
+ final String path = member.address() + PATH + topologyId;
+ LOG.debug("Actor at :{} is resolving topology actor for path {}", clusterExtension.selfAddress(), path);
+
+ clusterExtension.system().actorSelection(path).tell(new CustomIdentifyMessage(clusterExtension.selfAddress()), TypedActor.context().self());
+ } else if (message instanceof CustomIdentifyMessageReply) {
+ LOG.debug("Received a custom identify reply message from: {}", ((CustomIdentifyMessageReply) message).getAddress());
+ if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
+ final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
+ peers.put(((CustomIdentifyMessageReply) message).getAddress(), peer);
+ }
+ } else if (message instanceof CustomIdentifyMessage) {
+ LOG.debug("Received a custom identify message from: {}", ((CustomIdentifyMessage) message).getAddress());
+ if (!peers.containsKey(((CustomIdentifyMessage) message).getAddress())) {
+ final TopologyManager peer = typedExtension.typedActorOf(new TypedProps<>(TopologyManager.class, BaseTopologyManager.class), actorRef);
+ peers.put(((CustomIdentifyMessage) message).getAddress(), peer);
+ }
+ actorRef.tell(new CustomIdentifyMessageReply(clusterExtension.selfAddress()), TypedActor.context().self());
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.netconf.topology.NodeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NodeRoleChangeStrategy implements RoleChangeStrategy, EntityOwnershipListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodeRoleChangeStrategy.class);
+
+ private final EntityOwnershipService entityOwnershipService;
+ private final String entityType;
+ private final String entityName;
+ private NodeListener ownershipCandidate;
+
+ private EntityOwnershipCandidateRegistration candidateRegistration = null;
+ private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+ public NodeRoleChangeStrategy(final EntityOwnershipService entityOwnershipService,
+ final String entityType,
+ final String entityName) {
+ this.entityOwnershipService = entityOwnershipService;
+ this.entityType = entityType + "/" + entityName;
+ this.entityName = entityName;
+ }
+
+ @Override
+ public void registerRoleCandidate(NodeListener electionCandidate) {
+ LOG.debug("Registering role candidate type: {} , name: {}", entityType, entityName);
+ this.ownershipCandidate = electionCandidate;
+ try {
+ if (candidateRegistration != null) {
+ unregisterRoleCandidate();
+ }
+ candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+ ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
+ } catch (CandidateAlreadyRegisteredException e) {
+ LOG.error("Candidate already registered for election", e);
+ throw new IllegalStateException("Candidate already registered for election", e);
+ }
+ }
+
+ @Override
+ public void unregisterRoleCandidate() {
+ LOG.debug("Unregistering role candidate");
+ candidateRegistration.close();
+ candidateRegistration = null;
+ ownershipListenerRegistration.close();
+ ownershipListenerRegistration = null;
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+ LOG.debug("Role was changed {}", roleChangeDTO);
+ ownershipCandidate.onRoleChanged(roleChangeDTO);
+ }
+
+ @Override
+ public void ownershipChanged(EntityOwnershipChange ownershipChange) {
+ LOG.debug("Ownership has changed {}", ownershipChange);
+ ownershipCandidate.onRoleChanged(new RoleChangeDTO(ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner()));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import org.opendaylight.netconf.topology.NodeListener;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+
+/**
+ * Use this strategy to override the default roleChange registration's in BaseTopologyManager|BaseNodeManager
+ * If you use this, you will need to execute your own election in your implemented callbacks.
+ */
+public class NoopRoleChangeStrategy implements RoleChangeStrategy {
+
+ @Override
+ public void registerRoleCandidate(NodeListener electionCandidate) {
+
+ }
+
+ @Override
+ public void unregisterRoleCandidate() {
+
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SalNodeWriter implements NodeWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SalNodeWriter.class);
+
+ private final DataBroker dataBroker;
+ private final String topologyId;
+
+ private final InstanceIdentifier<Topology> topologyListPath;
+
+ public SalNodeWriter(final DataBroker dataBroker, final String topologyId) {
+ this.dataBroker = dataBroker;
+ this.topologyId = topologyId;
+ this.topologyListPath = createTopologyId(this.topologyId);
+ }
+
+ //FIXME change to txChains
+ @Override public void init(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {
+ // put into Datastore
+ final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+ wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);
+ commitTransaction(wTx, id, "init");
+ }
+
+ @Override public void update(@Nonnull final NodeId id, @Nonnull final Node operationalDataNode) {
+ // merge
+ final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+ wTx.put(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id), operationalDataNode);
+ commitTransaction(wTx, id, "update");
+ }
+
+ @Override public void delete(@Nonnull final NodeId id) {
+ // delete
+ final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
+ wTx.delete(LogicalDatastoreType.OPERATIONAL, createBindingPathForTopology(id));
+ commitTransaction(wTx, id, "delete");
+ }
+
+ private void commitTransaction(final WriteTransaction transaction, final NodeId id, final String txType) {
+ LOG.debug("{}: Committing Transaction {}:{}", id.getValue(), txType,
+ transaction.getIdentifier());
+ final CheckedFuture<Void, TransactionCommitFailedException> result = transaction.submit();
+
+ Futures.addCallback(result, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("{}: Transaction({}) {} SUCCESSFUL", id.getValue(), txType,
+ transaction.getIdentifier());
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("{}: Transaction({}) {} FAILED!", id.getValue(), txType,
+ transaction.getIdentifier(), t);
+ throw new IllegalStateException(id + " Transaction(" + txType + ") not committed correctly", t);
+ }
+ });
+ }
+
+ private InstanceIdentifier<Node> createBindingPathForTopology(final NodeId id) {
+ return topologyListPath.child(Node.class, new NodeKey(id));
+ }
+
+ private InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+ return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.util;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.topology.NodeListener;
+import org.opendaylight.netconf.topology.RoleChangeStrategy;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.PathArgument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopologyRoleChangeStrategy implements RoleChangeStrategy, ClusteredDataTreeChangeListener<Node>, EntityOwnershipListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TopologyRoleChangeStrategy.class);
+
+ private final DataBroker dataBroker;
+
+ private final EntityOwnershipService entityOwnershipService;
+ private NodeListener ownershipCandidate;
+ private final String entityType;
+ // use topologyId as entityName
+ private final String entityName;
+
+ private EntityOwnershipCandidateRegistration candidateRegistration = null;
+ private EntityOwnershipListenerRegistration ownershipListenerRegistration = null;
+
+ private ListenerRegistration<TopologyRoleChangeStrategy> datastoreListenerRegistration;
+
+ public TopologyRoleChangeStrategy(final DataBroker dataBroker,
+ final EntityOwnershipService entityOwnershipService,
+ final String entityType,
+ final String entityName) {
+ this.dataBroker = dataBroker;
+ this.entityOwnershipService = entityOwnershipService;
+ this.entityType = entityType;
+ this.entityName = entityName;
+
+ datastoreListenerRegistration = null;
+ }
+
+ @Override
+ public void registerRoleCandidate(NodeListener electionCandidate) {
+ LOG.warn("Registering candidate");
+ ownershipCandidate = electionCandidate;
+ try {
+ if (candidateRegistration != null) {
+ unregisterRoleCandidate();
+ }
+ candidateRegistration = entityOwnershipService.registerCandidate(new Entity(entityType, entityName));
+ ownershipListenerRegistration = entityOwnershipService.registerListener(entityType, this);
+ } catch (CandidateAlreadyRegisteredException e) {
+ LOG.error("Candidate already registered for election", e);
+ throw new IllegalStateException("Candidate already registered for election", e);
+ }
+ }
+
+ @Override
+ public void unregisterRoleCandidate() {
+ candidateRegistration.close();
+ candidateRegistration = null;
+ ownershipListenerRegistration.close();
+ ownershipListenerRegistration = null;
+ }
+
+ @Override
+ public void onRoleChanged(RoleChangeDTO roleChangeDTO) {
+ if (roleChangeDTO.isOwner()) {
+ LOG.warn("Gained ownership of entity, registering datastore listener");
+
+ if (datastoreListenerRegistration == null) {
+ LOG.debug("Listener on path {}", createTopologyId(entityType).child(Node.class).getPathArguments());
+ datastoreListenerRegistration = dataBroker.registerDataTreeChangeListener(
+ new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, createTopologyId(entityType).child(Node.class)), this);
+ }
+ } else if (datastoreListenerRegistration != null) {
+ LOG.warn("No longer owner of entity, unregistering datastore listener");
+ datastoreListenerRegistration.close();
+ datastoreListenerRegistration = null;
+ }
+ ownershipCandidate.onRoleChanged(roleChangeDTO);
+ }
+
+ @Override
+ public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
+ onRoleChanged(new RoleChangeDTO(ownershipChange.wasOwner(), ownershipChange.isOwner(), ownershipChange.hasOwner()));
+ }
+
+ @Override
+ public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
+ for (DataTreeModification<Node> change : changes) {
+ final DataObjectModification<Node> rootNode = change.getRootNode();
+ switch (rootNode.getModificationType()) {
+ case WRITE:
+ LOG.debug("Data was Created {}, {}", rootNode.getIdentifier(), rootNode.getDataAfter());
+ ownershipCandidate.onNodeCreated(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+ break;
+ case SUBTREE_MODIFIED:
+ LOG.debug("Data was Updated {}, {}", rootNode.getIdentifier(), rootNode.getDataAfter());
+ ownershipCandidate.onNodeUpdated(getNodeId(rootNode.getIdentifier()), rootNode.getDataAfter());
+ break;
+ case DELETE:
+ LOG.debug("Data was Deleted {}", rootNode.getIdentifier());
+ ownershipCandidate.onNodeDeleted(getNodeId(rootNode.getIdentifier()));
+ }
+ }
+ }
+
+ /**
+ * Determines the Netconf Node Node ID, given the node's instance
+ * identifier.
+ *
+ * @param pathArgument Node's path arument
+ * @return NodeId for the node
+ */
+ private NodeId getNodeId(final PathArgument pathArgument) {
+ if (pathArgument instanceof InstanceIdentifier.IdentifiableItem<?, ?>) {
+
+ final Identifier key = ((InstanceIdentifier.IdentifiableItem) pathArgument).getKey();
+ if(key instanceof NodeKey) {
+ return ((NodeKey) key).getNodeId();
+ }
+ }
+ throw new IllegalStateException("Unable to create NodeId from: " + pathArgument);
+ }
+
+ private static InstanceIdentifier<Topology> createTopologyId(final String topologyId) {
+ final InstanceIdentifier<NetworkTopology> networkTopology = InstanceIdentifier.create(NetworkTopology.class);
+ return networkTopology.child(Topology.class, new TopologyKey(new TopologyId(topologyId)));
+ }
+}