import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
private final NodeWriter naSalNodeWriter;
private final String topologyId;
private final TopologyManagerCallback delegateTopologyHandler;
+ private final Set<NodeId> created = new HashSet<>();
private final Map<Address, TopologyManager> peers = new HashMap<>();
private TopologyManager masterPeer = null;
public ListenableFuture<Node> onNodeCreated(final NodeId nodeId, final Node node) {
LOG.debug("TopologyManager({}) onNodeCreated received, nodeid: {} , isMaster: {}", id, nodeId.getValue(), isMaster);
+ if (created.contains(nodeId)) {
+ LOG.warn("Node{} already exists, triggering update..", nodeId);
+ return onNodeUpdated(nodeId, node);
+ }
+ created.add(nodeId);
final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
if (isMaster) {
public ListenableFuture<Node> onNodeUpdated(final NodeId nodeId, final Node node) {
LOG.debug("TopologyManager({}) onNodeUpdated received, nodeid: {}", id, nodeId.getValue());
- final ArrayList<ListenableFuture<Node>> futures = new ArrayList<>();
-
// Master needs to trigger onNodeUpdated on peers and combine results
if (isMaster) {
- futures.add(delegateTopologyHandler.onNodeUpdated(nodeId, node));
- for (TopologyManager topologyManager : peers.values()) {
- // convert binding into NormalizedNode for transfer
- final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> normalizedNodeEntry = codecRegistry.toNormalizedNode(getNodeIid(topologyId), node);
-
- // add a future into our futures that gets its completion status from the converted scala future
- final SettableFuture<Node> settableFuture = SettableFuture.create();
- futures.add(settableFuture);
- final Future<NormalizedNodeMessage> scalaFuture = topologyManager.onRemoteNodeUpdated(new NormalizedNodeMessage(normalizedNodeEntry.getKey(), normalizedNodeEntry.getValue()));
- scalaFuture.onComplete(new OnComplete<NormalizedNodeMessage>() {
- @Override
- public void onComplete(Throwable failure, NormalizedNodeMessage success) throws Throwable {
- if (failure != null) {
- settableFuture.setException(failure);
- return;
+ // first cleanup old node
+ final ListenableFuture<Void> deleteFuture = onNodeDeleted(nodeId);
+ final SettableFuture<Node> createFuture = SettableFuture.create();
+ final TopologyManager selfProxy = TypedActor.self();
+ final ActorContext context = TypedActor.context();
+ Futures.addCallback(deleteFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.warn("Delete part of update succesfull, triggering create");
+ // trigger create on all nodes
+ Futures.addCallback(selfProxy.onNodeCreated(nodeId, node), new FutureCallback<Node>() {
+ @Override
+ public void onSuccess(Node result) {
+ createFuture.set(result);
}
- final Entry<InstanceIdentifier<?>, DataObject> fromNormalizedNode =
- codecRegistry.fromNormalizedNode(success.getIdentifier(), success.getNode());
- final Node value = (Node) fromNormalizedNode.getValue();
- settableFuture.set(value);
- }
- }, TypedActor.context().dispatcher());
- }
-
- final ListenableFuture<Node> aggregatedFuture = aggregator.combineUpdateAttempts(futures);
- Futures.addCallback(aggregatedFuture, new FutureCallback<Node>() {
- @Override
- public void onSuccess(final Node result) {
- // FIXME make this (writing state data for nodes) optional and customizable
- // this should be possible with providing your own NodeWriter implementation, maybe rename this interface?
- naSalNodeWriter.update(nodeId, result);
+ @Override
+ public void onFailure(Throwable t) {
+ createFuture.setException(t);
+ }
+ }, context.dispatcher());
}
@Override
- public void onFailure(final Throwable t) {
- // If the combined connection attempt failed, set the node to connection failed
- naSalNodeWriter.update(nodeId, delegateTopologyHandler.getFailedState(nodeId, node));
- // FIXME disconnect those which succeeded
- // just issue a delete on delegateTopologyHandler that gets handled on lower level
+ public void onFailure(Throwable t) {
+ LOG.warn("Delete part of update failed, {}", t);
}
- });
-
- //combine peer futures
- return aggregatedFuture;
+ }, context.dispatcher());
+ return createFuture;
}
// Trigger update on this slave
@Override
public ListenableFuture<Void> onNodeDeleted(final NodeId nodeId) {
final ArrayList<ListenableFuture<Void>> futures = new ArrayList<>();
+ created.remove(nodeId);
// Master needs to trigger delete on peers and combine results
if (isMaster) {