import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
private final NodeWriter naSalNodeWriter;
private final String topologyId;
private final TopologyManagerCallback delegateTopologyHandler;
+ private final Set<NodeId> created = new HashSet<>();
private final Map<Address, TopologyManager> peers = new HashMap<>();
private TopologyManager masterPeer = null;
public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
+ if (created.contains(nodeId)) {
+ LOG.warn("Node{} already exists, triggering update..", nodeId);
+ return onNodeUpdated(nodeId, node);
+ }
+ created.add(nodeId);
final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
if (isMaster) {
public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
- final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
-
// Master needs to trigger onNodeUpdated on peers and combine results
if (isMaster) {
- futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
- for (TopologyManager topologyManager : peers.values()) {
- // convert binding into NormalizedNode for transfer
- final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
-
- // add a future into our futures that gets its completion status from the converted scala future
- final SettableFuture<Node> settableFuture = SettableFuture.create();
- futures.add(settableFuture);
- final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
- scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
- @Override
- public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
- if (failure != null) {
- settableFuture.setException(failure);
- return;
+ // first cleanup old node
+ final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
+ final SettableFuture<Node> createFuture = SettableFuture.create();
+ final TopologyManager selfProxy = TypedActor.self();
+ final ActorContext context = TypedActor.context();
+ Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.warn("Delete part of update succesfull, triggering create");
+ // trigger create on all nodes
+ Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ createFuture.set(result);
}
- final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
- codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
- final Node value = (Node) fromNormalizedNode.getValue();
- settableFuture.set(value);
- }
- }, TypedActor.context().dispatcher());
- }
-
- final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
- Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
- @Override
- public void onSuccess(final Node result) {
- // FIXME make this (writing state data for nodes) optional and customizable
- // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
- naSalNodeWriter.update(nodeId, result);
+ @Override
+ public void onFailure(Throwable t) {
+ createFuture.setException(t);
+ }
+ }, context.dispatcher());
}
@Override
- public void onFailure(final Throwable t) {
- // If the combined connection attempt failed, set the node to connection failed
- naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
- // FIXME disconnect those which succeeded
- // just issue a delete on delegateTopologyHandler that gets handled on lower level
+ public void onFailure(Throwable t) {
+ LOG.warn("Delete part of update failed, {}", t);
}
- });
-
- //combine peer futures
- return aggregatedFuture;
+ }, context.dispatcher());
+ return createFuture;
}
// Trigger update on this slave
@Override
public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+ created.remove(nodeId);
// Master needs to trigger delete on peers and combine results
if (isMaster) {
import akka.actor.TypedActorExtension;
import akka.actor.TypedProps;
import akka.japi.Creator;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
import javassist.ClassPool;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.AvailableCapabilitiesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.ClusteredConnectionStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.UnavailableCapabilitiesBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatus.Status;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.clustered.connection.status.NodeStatusBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.unavailable.capabilities.UnavailableCapability;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
@Mock
private DataBroker dataBroker;
+ @Mock
+ private ReadOnlyTransaction mockedReadOnlyTx;
+
private static final BindingNormalizedNodeCodecRegistry CODEC_REGISTRY;
static {
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
+ final SettableFuture<Optional<Topology>> settableFuture = SettableFuture.create();
+ final CheckedFuture<Optional<Topology>, ReadFailedException> checkedFuture = Futures.makeChecked(settableFuture, new Function<Exception, ReadFailedException>() {
+ @Nullable
+ @Override
+ public ReadFailedException apply(Exception input) {
+ return new ReadFailedException("Dummy future should never return this");
+ }
+ });
+ settableFuture.set(Optional.<Topology>absent());
+ when(mockedReadOnlyTx.read(any(LogicalDatastoreType.class), any(InstanceIdentifier.class))).thenReturn(checkedFuture);
when(dataBroker.registerDataChangeListener(
any(LogicalDatastoreType.class),
any(InstanceIdentifier.class),
any(DataChangeListener.class),
any(DataChangeScope.class))).thenReturn(null);
+ when(dataBroker.newReadOnlyTransaction()).thenReturn(mockedReadOnlyTx);
}
private void setMaster(final TopologyManager manager) {
}
});
}
+ LOG.debug("Waiting for updates to finish");
+ Futures.allAsList(futures).get();
final List<ListenableFuture<Void>> deleteFutures = new ArrayList<>();
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.setConnectionStatus(ConnectionStatus.Connecting)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setHost(netconfNode.getHost())
.setPort(netconfNode.getPort())
.setConnectionStatus(ConnectionStatus.UnableToConnect)
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(augmentation.getHost())
.setPort(augmentation.getPort())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(augmentation.getHost())
.setPort(augmentation.getPort())
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.setClusteredConnectionStatus(
new ClusteredConnectionStatusBuilder()
.setNodeStatus(
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
.setPort(new PortNumber(2555))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
.build());
}
.setConnectionStatus(ConnectionStatus.Connected)
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
.setPort(new PortNumber(65535))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
.build());
}
.setConnectionStatus(ConnectionStatus.Connecting)
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
.setPort(new PortNumber(65535))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
.build();
}
.setConnectionStatus(ConnectionStatus.UnableToConnect)
.setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
.setPort(new PortNumber(65535))
+ .setAvailableCapabilities(new AvailableCapabilitiesBuilder().setAvailableCapability(new ArrayList<String>()).build())
+ .setUnavailableCapabilities(new UnavailableCapabilitiesBuilder().setUnavailableCapability(new ArrayList<UnavailableCapability>()).build())
.build())
.build();
}