import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.Connected;
import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.Connecting;
+import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.UnableToConnect;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.MountPoint;
import org.opendaylight.controller.md.sal.binding.api.MountPointService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.groupbasedpolicy.renderer.vpp.nat.NatUtil;
import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppIidFactory;
+import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppRendererProcessingException;
import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana._if.type.rev140508.EthernetCsmacd;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.CheckedFuture;
public class VppNodeManager {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
this.mountService = Preconditions.checkNotNull(session.getSALService(MountPointService.class));
requiredCapabilities = initializeRequiredCapabilities();
- if (!Strings.isNullOrEmpty(physicalInterfaces) && physicalInterfaces != NO_PUBLIC_INT_SPECIFIED) {
+ if (!Strings.isNullOrEmpty(physicalInterfaces) && !Objects.equals(physicalInterfaces, NO_PUBLIC_INT_SPECIFIED)) {
loadPhysicalInterfaces(physicalInterfaces);
}
}
/**
* Caches list of physical interfaces.
*/
- public void loadPhysicalInterfaces(@Nonnull String physicalInterfaces) {
+ private void loadPhysicalInterfaces(@Nonnull String physicalInterfaces) {
for (String intfOnNode : Sets.newConcurrentHashSet(Splitter.on(",").split(physicalInterfaces))) {
List<String> entries = Lists.newArrayList(Splitter.on(":").split(intfOnNode));
if (entries.size() != 2) {
* Synchronizes nodes to DataStore based on their modification state which results in
* create/update/remove of Node.
*/
- public void syncNodes(Node dataAfter, Node dataBefore) {
+ public void syncNodes(final Node dataAfter, final Node dataBefore) {
if (isControllerConfigNode(dataAfter, dataBefore)) {
LOG.trace("{} is ignored by VPP-renderer", CONTROLLER_CONFIG_NODE);
return;
}
+ ListenableFuture<String> syncFuture = Futures.immediateFuture(null);
// New node
if (dataBefore == null && dataAfter != null) {
- createNode(dataAfter);
+ syncFuture = createNode(dataAfter);
}
// Connected/disconnected node
- if (dataBefore != null && dataAfter != null) {
- updateNode(dataAfter);
+ else if (dataBefore != null && dataAfter != null) {
+ syncFuture = updateNode(dataAfter);
}
// Removed node
- if (dataBefore != null && dataAfter == null) {
- removeNode(dataBefore);
+ else if (dataBefore != null) {
+ syncFuture = removeNode(dataBefore);
}
+ Futures.addCallback(syncFuture, new FutureCallback<String>() {
+ @Override
+ public void onSuccess(@Nullable String message) {
+ LOG.info("Node synchronization completed. {} ", message);
+ }
+
+ @Override
+ public void onFailure(@Nonnull Throwable t) {
+ LOG.warn("Node synchronization failed. Data before: {} after {}", dataBefore, dataAfter);
+ }
+ });
}
- private boolean isControllerConfigNode(Node dataAfter, Node dataBefore) {
+ private boolean isControllerConfigNode(final Node dataAfter, final Node dataBefore) {
if (dataAfter != null) {
return CONTROLLER_CONFIG_NODE.equals(dataAfter.getNodeId());
}
return CONTROLLER_CONFIG_NODE.equals(dataBefore.getNodeId());
}
- private void createNode(Node node) {
- LOG.info("Registering new node {}", node.getNodeId().getValue());
- NetconfNode netconfNode = getNodeAugmentation(node);
+ private ListenableFuture<String> createNode(final Node node) {
+ final String nodeId = node.getNodeId().getValue();
+ LOG.info("Registering new node {}", nodeId);
+ final NetconfNode netconfNode = getNodeAugmentation(node);
if (netconfNode == null) {
- return;
+ final String message = String.format("Node %s is not an netconf node", nodeId);
+ return Futures.immediateFuture(message);
}
- NetconfNodeConnectionStatus.ConnectionStatus connectionStatus = netconfNode.getConnectionStatus();
+ final NetconfNodeConnectionStatus.ConnectionStatus connectionStatus = netconfNode.getConnectionStatus();
switch (connectionStatus) {
- case Connecting:
- LOG.info("Connecting device {} ...", node.getNodeId().getValue());
- break;
- case Connected:
- resolveConnectedNode(node, netconfNode);
- break;
- default:
- break;
+ case Connecting: {
+ final String message = String.format("Connecting device %s ...", nodeId);
+ return Futures.immediateFuture(message);
+ }
+ case Connected: {
+ return resolveConnectedNode(node, netconfNode);
+ }
+ case UnableToConnect: {
+ final String message = String.format("Connection status is unable to connect for node %s", nodeId);
+ return Futures.immediateFuture(message);
+ }
+ default: {
+ final String message = String.format("Unknown connection status for node %s", nodeId);
+ return Futures.immediateFailedFuture(new VppRendererProcessingException(message));
+ }
}
}
- private void updateNode(Node node) {
- LOG.info("Updating node {}", node.getNodeId());
- NetconfNode netconfNode = getNodeAugmentation(node);
- if (netconfNode == null || netconfNode.getConnectionStatus() == null) {
- return;
- }
- NetconfNodeConnectionStatus.ConnectionStatus afterNodeStatus = netconfNode.getConnectionStatus();
- if (afterNodeStatus.equals(Connected)) {
- resolveConnectedNode(node, netconfNode);
+ private ListenableFuture<String> updateNode(final Node node) {
+ final String nodeId = node.getNodeId().getValue();
+ LOG.info("Updating node {}", nodeId);
+ final NetconfNode netconfNode = getNodeAugmentation(node);
+ if (netconfNode == null) {
+ final String message = String.format("Node %s is not an netconf node", nodeId);
+ return Futures.immediateFuture(message);
}
- if (afterNodeStatus.equals(Connecting)) {
- resolveDisconnectedNode(node);
- LOG.info("Node {} is disconnected, removing from available nodes", node.getNodeId().getValue());
+ final NetconfNodeConnectionStatus.ConnectionStatus afterNodeStatus = netconfNode.getConnectionStatus();
+ if (Connected.equals(afterNodeStatus)) {
+ return resolveConnectedNode(node, netconfNode);
+ } else if (Connecting.equals(afterNodeStatus)) {
+ final String cause = String.format("Node %s is disconnected, removing from available nodes", nodeId);
+ return resolveDisconnectedNode(node, cause);
+ } else if (UnableToConnect.equals(afterNodeStatus)) {
+ final String cause = String.format("New node %s status is unable to connect, removing from available nodes",
+ nodeId);
+ return resolveDisconnectedNode(node, cause);
+ } else {
+ final String cause = String.format("New node status is unknown. Node %s will be removed from available nodes",
+ nodeId);
+ return resolveDisconnectedNode(node, cause);
}
}
- private void removeNode(Node node) {
- resolveDisconnectedNode(node);
- LOG.info("Node {} is removed", node.getNodeId().getValue());
+ private ListenableFuture<String> removeNode(final Node node) {
+ final String cause = String.format("Node %s is removed", node.getNodeId().getValue());
+ return resolveDisconnectedNode(node, cause);
}
- private void resolveConnectedNode(Node node, NetconfNode netconfNode) {
- InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
- RendererNode rendererNode = remapNode(mountPointIid);
+ private ListenableFuture<String> resolveConnectedNode(final Node node, final NetconfNode netconfNode) {
+ final String nodeId = node.getNodeId().getValue();
+ final InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
+ final RendererNode rendererNode = remapNode(mountPointIid);
if (!isCapableNetconfDevice(node, netconfNode)) {
- LOG.warn("Node {} is not connected.", node.getNodeId().getValue());
- return;
+ final String message = String.format("Node %s is not connected", nodeId);
+ return Futures.immediateFuture(message);
}
final DataBroker mountpoint = getNodeMountPoint(mountPointIid);
if (mountpoint == null) {
- LOG.warn("Mountpoint not available for node {}", node.getNodeId().getValue());
- return;
+ final String message = String.format("Mountpoint not available for node %s", nodeId);
+ return Futures.immediateFuture(message);
}
final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
wTx.put(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode), rendererNode, true);
- DataStoreHelper.submitToDs(wTx);
- LOG.info("Node {} is capable and ready.", node.getNodeId().getValue());
- syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid);
- NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces);
+ final boolean submit = DataStoreHelper.submitToDs(wTx);
+ if (submit) {
+ final String message = String.format("Node %s is capable and ready", nodeId);
+ syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid);
+ NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces);
+ return Futures.immediateFuture(message);
+ } else {
+ final String message = String.format("Failed to resolve connected node %s", nodeId);
+ return Futures.immediateFuture(message);
+ }
}
- private void resolveDisconnectedNode(Node node) {
- InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
- RendererNode rendererNode = remapNode(mountPointIid);
+ private ListenableFuture<String> resolveDisconnectedNode(final Node node, final String cause) {
+ final InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
+ final RendererNode rendererNode = remapNode(mountPointIid);
final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
wTx.delete(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode));
- CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
+ final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture = wTx.submit();
try {
- submitFuture.checkedGet();
+ checkedFuture.checkedGet();
+ return Futures.immediateFuture(cause);
} catch (TransactionCommitFailedException e) {
- LOG.error("Write transaction failed to {}", e.getMessage());
- } catch (Exception e) {
- LOG.error("Failed to .. {}", e.getMessage());
+ final String message = String.format("Failed to resolve disconnected node %s", node.getNodeId().getValue());
+ return Futures.immediateFailedFuture(new VppRendererProcessingException(message));
}
}
@Nullable
- private DataBroker getNodeMountPoint(InstanceIdentifier<Node> mountPointIid) {
+ private DataBroker getNodeMountPoint(final InstanceIdentifier<Node> mountPointIid) {
final Future<Optional<MountPoint>> futureOptionalObject = getMountpointFromSal(mountPointIid);
try {
final Optional<MountPoint> optionalObject = futureOptionalObject.get();
}
}
- private RendererNode remapNode(InstanceIdentifier<Node> path) {
- RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder();
+ private RendererNode remapNode(final InstanceIdentifier<Node> path) {
+ final RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder();
rendererNodeBuilder.setKey(new RendererNodeKey(path)).setNodePath(path);
return rendererNodeBuilder.build();
}
- private InstanceIdentifier<Node> getMountpointIid(Node node) {
+ private InstanceIdentifier<Node> getMountpointIid(final Node node) {
return InstanceIdentifier.builder(NetworkTopology.class)
- .child(Topology.class, new TopologyKey(TOPOLOGY_ID))
- .child(Node.class, new NodeKey(node.getNodeId()))
- .build();
+ .child(Topology.class, new TopologyKey(TOPOLOGY_ID))
+ .child(Node.class, new NodeKey(node.getNodeId()))
+ .build();
}
- private boolean isCapableNetconfDevice(Node node, NetconfNode netconfAugmentation) {
+ private boolean isCapableNetconfDevice(final Node node, final NetconfNode netconfAugmentation) {
if (netconfAugmentation.getAvailableCapabilities() == null
|| netconfAugmentation.getAvailableCapabilities().getAvailableCapability() == null
|| netconfAugmentation.getAvailableCapabilities().getAvailableCapability().isEmpty()) {
.allMatch(availableCapabilities::contains);
}
- private NetconfNode getNodeAugmentation(Node node) {
- NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+ private NetconfNode getNodeAugmentation(final Node node) {
+ final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
if (netconfNode == null) {
LOG.warn("Node {} is not a netconf device", node.getNodeId().getValue());
return null;
*/
private List<String> initializeRequiredCapabilities() {
// Required device capabilities
-
String[] capabilityEntries = {V3PO_CAPABILITY, INTERFACES_CAPABILITY};
return Arrays.asList(capabilityEntries);
}
} catch (InterruptedException e) {
LOG.warn("Thread interrupted to ", e);
}
- attempt ++;
+ attempt++;
} while (attempt <= 3);
return Optional.absent();
};
private List<IpAddress> resolveIpAddress(Interface1 iface) {
if (iface.getIpv4() != null && iface.getIpv4().getAddress() != null) {
- return iface.getIpv4().getAddress().stream().map(ipv4 -> {
- return new IpAddress(new Ipv4Address(ipv4.getIp().getValue()));
- }).collect(Collectors.toList());
+ return iface.getIpv4().getAddress().stream().map(ipv4 ->
+ new IpAddress(new Ipv4Address(ipv4.getIp().getValue()))).collect(Collectors.toList());
} else if (iface.getIpv6() != null && iface.getIpv6().getAddress() != null) {
- return iface.getIpv6().getAddress().stream().map(ipv6 -> {
- return new IpAddress(new Ipv4Address(ipv6.getIp().getValue()));
- }).collect(Collectors.toList());
+ return iface.getIpv6().getAddress().stream().map(ipv6 ->
+ new IpAddress(new Ipv4Address(ipv6.getIp().getValue()))).collect(Collectors.toList());
}
return Lists.newArrayList();
}