X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=renderers%2Fvpp%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fgroupbasedpolicy%2Frenderer%2Fvpp%2Fmanager%2FVppNodeManager.java;h=0a9813400111cc53812d928ec75b876188dd2cb6;hb=35b36cec1f8a386c2dbbba06cb585b21ea1ba7a8;hp=dda1d534ca689301946ea786506ef4d4cb312da2;hpb=90dcc8c4d1c0e6b0131dfbf36892ade663d78086;p=groupbasedpolicy.git diff --git a/renderers/vpp/src/main/java/org/opendaylight/groupbasedpolicy/renderer/vpp/manager/VppNodeManager.java b/renderers/vpp/src/main/java/org/opendaylight/groupbasedpolicy/renderer/vpp/manager/VppNodeManager.java index dda1d534c..0a9813400 100644 --- a/renderers/vpp/src/main/java/org/opendaylight/groupbasedpolicy/renderer/vpp/manager/VppNodeManager.java +++ b/renderers/vpp/src/main/java/org/opendaylight/groupbasedpolicy/renderer/vpp/manager/VppNodeManager.java @@ -10,12 +10,14 @@ package org.opendaylight.groupbasedpolicy.renderer.vpp.manager; import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.Connected; import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.Connecting; +import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.UnableToConnect; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -26,6 +28,14 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.MountPoint; import org.opendaylight.controller.md.sal.binding.api.MountPointService; @@ -37,6 +47,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.groupbasedpolicy.renderer.vpp.nat.NatUtil; import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppIidFactory; +import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppRendererProcessingException; import org.opendaylight.groupbasedpolicy.util.DataStoreHelper; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana._if.type.rev140508.EthernetCsmacd; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress; @@ -65,13 +76,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.CheckedFuture; public class VppNodeManager { @@ -92,7 +98,7 @@ public class VppNodeManager { this.dataBroker = Preconditions.checkNotNull(dataBroker); this.mountService = Preconditions.checkNotNull(session.getSALService(MountPointService.class)); requiredCapabilities = initializeRequiredCapabilities(); - if (!Strings.isNullOrEmpty(physicalInterfaces) && physicalInterfaces != NO_PUBLIC_INT_SPECIFIED) { + if (!Strings.isNullOrEmpty(physicalInterfaces) && !Objects.equals(physicalInterfaces, NO_PUBLIC_INT_SPECIFIED)) { loadPhysicalInterfaces(physicalInterfaces); } } @@ -100,7 +106,7 @@ public class VppNodeManager { /** * Caches list of physical interfaces. */ - public void loadPhysicalInterfaces(@Nonnull String physicalInterfaces) { + private void loadPhysicalInterfaces(@Nonnull String physicalInterfaces) { for (String intfOnNode : Sets.newConcurrentHashSet(Splitter.on(",").split(physicalInterfaces))) { List entries = Lists.newArrayList(Splitter.on(":").split(intfOnNode)); if (entries.size() != 2) { @@ -118,109 +124,146 @@ public class VppNodeManager { * Synchronizes nodes to DataStore based on their modification state which results in * create/update/remove of Node. */ - public void syncNodes(Node dataAfter, Node dataBefore) { + public void syncNodes(final Node dataAfter, final Node dataBefore) { if (isControllerConfigNode(dataAfter, dataBefore)) { LOG.trace("{} is ignored by VPP-renderer", CONTROLLER_CONFIG_NODE); return; } + ListenableFuture syncFuture = Futures.immediateFuture(null); // New node if (dataBefore == null && dataAfter != null) { - createNode(dataAfter); + syncFuture = createNode(dataAfter); } // Connected/disconnected node - if (dataBefore != null && dataAfter != null) { - updateNode(dataAfter); + else if (dataBefore != null && dataAfter != null) { + syncFuture = updateNode(dataAfter); } // Removed node - if (dataBefore != null && dataAfter == null) { - removeNode(dataBefore); + else if (dataBefore != null) { + syncFuture = removeNode(dataBefore); } + Futures.addCallback(syncFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable String message) { + LOG.info("Node synchronization completed. {} ", message); + } + + @Override + public void onFailure(@Nonnull Throwable t) { + LOG.warn("Node synchronization failed. Data before: {} after {}", dataBefore, dataAfter); + } + }); } - private boolean isControllerConfigNode(Node dataAfter, Node dataBefore) { + private boolean isControllerConfigNode(final Node dataAfter, final Node dataBefore) { if (dataAfter != null) { return CONTROLLER_CONFIG_NODE.equals(dataAfter.getNodeId()); } return CONTROLLER_CONFIG_NODE.equals(dataBefore.getNodeId()); } - private void createNode(Node node) { - LOG.info("Registering new node {}", node.getNodeId().getValue()); - NetconfNode netconfNode = getNodeAugmentation(node); + private ListenableFuture createNode(final Node node) { + final String nodeId = node.getNodeId().getValue(); + LOG.info("Registering new node {}", nodeId); + final NetconfNode netconfNode = getNodeAugmentation(node); if (netconfNode == null) { - return; + final String message = String.format("Node %s is not an netconf node", nodeId); + return Futures.immediateFuture(message); } - NetconfNodeConnectionStatus.ConnectionStatus connectionStatus = netconfNode.getConnectionStatus(); + final NetconfNodeConnectionStatus.ConnectionStatus connectionStatus = netconfNode.getConnectionStatus(); switch (connectionStatus) { - case Connecting: - LOG.info("Connecting device {} ...", node.getNodeId().getValue()); - break; - case Connected: - resolveConnectedNode(node, netconfNode); - break; - default: - break; + case Connecting: { + final String message = String.format("Connecting device %s ...", nodeId); + return Futures.immediateFuture(message); + } + case Connected: { + return resolveConnectedNode(node, netconfNode); + } + case UnableToConnect: { + final String message = String.format("Connection status is unable to connect for node %s", nodeId); + return Futures.immediateFuture(message); + } + default: { + final String message = String.format("Unknown connection status for node %s", nodeId); + return Futures.immediateFailedFuture(new VppRendererProcessingException(message)); + } } } - private void updateNode(Node node) { - LOG.info("Updating node {}", node.getNodeId()); - NetconfNode netconfNode = getNodeAugmentation(node); - if (netconfNode == null || netconfNode.getConnectionStatus() == null) { - return; - } - NetconfNodeConnectionStatus.ConnectionStatus afterNodeStatus = netconfNode.getConnectionStatus(); - if (afterNodeStatus.equals(Connected)) { - resolveConnectedNode(node, netconfNode); + private ListenableFuture updateNode(final Node node) { + final String nodeId = node.getNodeId().getValue(); + LOG.info("Updating node {}", nodeId); + final NetconfNode netconfNode = getNodeAugmentation(node); + if (netconfNode == null) { + final String message = String.format("Node %s is not an netconf node", nodeId); + return Futures.immediateFuture(message); } - if (afterNodeStatus.equals(Connecting)) { - resolveDisconnectedNode(node); - LOG.info("Node {} is disconnected, removing from available nodes", node.getNodeId().getValue()); + final NetconfNodeConnectionStatus.ConnectionStatus afterNodeStatus = netconfNode.getConnectionStatus(); + if (Connected.equals(afterNodeStatus)) { + return resolveConnectedNode(node, netconfNode); + } else if (Connecting.equals(afterNodeStatus)) { + final String cause = String.format("Node %s is disconnected, removing from available nodes", nodeId); + return resolveDisconnectedNode(node, cause); + } else if (UnableToConnect.equals(afterNodeStatus)) { + final String cause = String.format("New node %s status is unable to connect, removing from available nodes", + nodeId); + return resolveDisconnectedNode(node, cause); + } else { + final String cause = String.format("New node status is unknown. Node %s will be removed from available nodes", + nodeId); + return resolveDisconnectedNode(node, cause); } } - private void removeNode(Node node) { - resolveDisconnectedNode(node); - LOG.info("Node {} is removed", node.getNodeId().getValue()); + private ListenableFuture removeNode(final Node node) { + final String cause = String.format("Node %s is removed", node.getNodeId().getValue()); + return resolveDisconnectedNode(node, cause); } - private void resolveConnectedNode(Node node, NetconfNode netconfNode) { - InstanceIdentifier mountPointIid = getMountpointIid(node); - RendererNode rendererNode = remapNode(mountPointIid); + private ListenableFuture resolveConnectedNode(final Node node, final NetconfNode netconfNode) { + final String nodeId = node.getNodeId().getValue(); + final InstanceIdentifier mountPointIid = getMountpointIid(node); + final RendererNode rendererNode = remapNode(mountPointIid); if (!isCapableNetconfDevice(node, netconfNode)) { - LOG.warn("Node {} is not connected.", node.getNodeId().getValue()); - return; + final String message = String.format("Node %s is not connected", nodeId); + return Futures.immediateFuture(message); } final DataBroker mountpoint = getNodeMountPoint(mountPointIid); if (mountpoint == null) { - LOG.warn("Mountpoint not available for node {}", node.getNodeId().getValue()); - return; + final String message = String.format("Mountpoint not available for node %s", nodeId); + return Futures.immediateFuture(message); } final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction(); wTx.put(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode), rendererNode, true); - DataStoreHelper.submitToDs(wTx); - LOG.info("Node {} is capable and ready.", node.getNodeId().getValue()); - syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid); - NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces); + final boolean submit = DataStoreHelper.submitToDs(wTx); + if (submit) { + final String message = String.format("Node %s is capable and ready", nodeId); + syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid); + NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces); + return Futures.immediateFuture(message); + } else { + final String message = String.format("Failed to resolve connected node %s", nodeId); + return Futures.immediateFuture(message); + } } - private void resolveDisconnectedNode(Node node) { - InstanceIdentifier mountPointIid = getMountpointIid(node); - RendererNode rendererNode = remapNode(mountPointIid); + private ListenableFuture resolveDisconnectedNode(final Node node, final String cause) { + final InstanceIdentifier mountPointIid = getMountpointIid(node); + final RendererNode rendererNode = remapNode(mountPointIid); final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction(); wTx.delete(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode)); - CheckedFuture submitFuture = wTx.submit(); + final CheckedFuture checkedFuture = wTx.submit(); try { - submitFuture.checkedGet(); + checkedFuture.checkedGet(); + return Futures.immediateFuture(cause); } catch (TransactionCommitFailedException e) { - LOG.error("Write transaction failed to {}", e.getMessage()); - } catch (Exception e) { - LOG.error("Failed to .. {}", e.getMessage()); + final String message = String.format("Failed to resolve disconnected node %s", node.getNodeId().getValue()); + return Futures.immediateFailedFuture(new VppRendererProcessingException(message)); } } @Nullable - private DataBroker getNodeMountPoint(InstanceIdentifier mountPointIid) { + private DataBroker getNodeMountPoint(final InstanceIdentifier mountPointIid) { final Future> futureOptionalObject = getMountpointFromSal(mountPointIid); try { final Optional optionalObject = futureOptionalObject.get(); @@ -246,20 +289,20 @@ public class VppNodeManager { } } - private RendererNode remapNode(InstanceIdentifier path) { - RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder(); + private RendererNode remapNode(final InstanceIdentifier path) { + final RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder(); rendererNodeBuilder.setKey(new RendererNodeKey(path)).setNodePath(path); return rendererNodeBuilder.build(); } - private InstanceIdentifier getMountpointIid(Node node) { + private InstanceIdentifier getMountpointIid(final Node node) { return InstanceIdentifier.builder(NetworkTopology.class) - .child(Topology.class, new TopologyKey(TOPOLOGY_ID)) - .child(Node.class, new NodeKey(node.getNodeId())) - .build(); + .child(Topology.class, new TopologyKey(TOPOLOGY_ID)) + .child(Node.class, new NodeKey(node.getNodeId())) + .build(); } - private boolean isCapableNetconfDevice(Node node, NetconfNode netconfAugmentation) { + private boolean isCapableNetconfDevice(final Node node, final NetconfNode netconfAugmentation) { if (netconfAugmentation.getAvailableCapabilities() == null || netconfAugmentation.getAvailableCapabilities().getAvailableCapability() == null || netconfAugmentation.getAvailableCapabilities().getAvailableCapability().isEmpty()) { @@ -281,8 +324,8 @@ public class VppNodeManager { .allMatch(availableCapabilities::contains); } - private NetconfNode getNodeAugmentation(Node node) { - NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); + private NetconfNode getNodeAugmentation(final Node node) { + final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class); if (netconfNode == null) { LOG.warn("Node {} is not a netconf device", node.getNodeId().getValue()); return null; @@ -301,7 +344,6 @@ public class VppNodeManager { */ private List initializeRequiredCapabilities() { // Required device capabilities - String[] capabilityEntries = {V3PO_CAPABILITY, INTERFACES_CAPABILITY}; return Arrays.asList(capabilityEntries); } @@ -326,7 +368,7 @@ public class VppNodeManager { } catch (InterruptedException e) { LOG.warn("Thread interrupted to ", e); } - attempt ++; + attempt++; } while (attempt <= 3); return Optional.absent(); }; @@ -384,13 +426,11 @@ public class VppNodeManager { private List resolveIpAddress(Interface1 iface) { if (iface.getIpv4() != null && iface.getIpv4().getAddress() != null) { - return iface.getIpv4().getAddress().stream().map(ipv4 -> { - return new IpAddress(new Ipv4Address(ipv4.getIp().getValue())); - }).collect(Collectors.toList()); + return iface.getIpv4().getAddress().stream().map(ipv4 -> + new IpAddress(new Ipv4Address(ipv4.getIp().getValue()))).collect(Collectors.toList()); } else if (iface.getIpv6() != null && iface.getIpv6().getAddress() != null) { - return iface.getIpv6().getAddress().stream().map(ipv6 -> { - return new IpAddress(new Ipv4Address(ipv6.getIp().getValue())); - }).collect(Collectors.toList()); + return iface.getIpv6().getAddress().stream().map(ipv6 -> + new IpAddress(new Ipv4Address(ipv6.getIp().getValue()))).collect(Collectors.toList()); } return Lists.newArrayList(); }