Bug 6743: added futures to InterfaceManager and VppNodeManager
[groupbasedpolicy.git] / renderers / vpp / src / main / java / org / opendaylight / groupbasedpolicy / renderer / vpp / manager / VppNodeManager.java
index dda1d534ca689301946ea786506ef4d4cb312da2..0a9813400111cc53812d928ec75b876188dd2cb6 100644 (file)
@@ -10,12 +10,14 @@ package org.opendaylight.groupbasedpolicy.renderer.vpp.manager;
 
 import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.Connected;
 import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.Connecting;
+import static org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus.UnableToConnect;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -26,6 +28,14 @@ import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
@@ -37,6 +47,7 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.nat.NatUtil;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppIidFactory;
+import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppRendererProcessingException;
 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.iana._if.type.rev140508.EthernetCsmacd;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
@@ -65,13 +76,8 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.CheckedFuture;
 
 public class VppNodeManager {
 
@@ -92,7 +98,7 @@ public class VppNodeManager {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.mountService = Preconditions.checkNotNull(session.getSALService(MountPointService.class));
         requiredCapabilities = initializeRequiredCapabilities();
-        if (!Strings.isNullOrEmpty(physicalInterfaces) && physicalInterfaces != NO_PUBLIC_INT_SPECIFIED) {
+        if (!Strings.isNullOrEmpty(physicalInterfaces) && !Objects.equals(physicalInterfaces, NO_PUBLIC_INT_SPECIFIED)) {
             loadPhysicalInterfaces(physicalInterfaces);
         }
     }
@@ -100,7 +106,7 @@ public class VppNodeManager {
     /**
      * Caches list of physical interfaces.
      */
-    public void loadPhysicalInterfaces(@Nonnull String physicalInterfaces) {
+    private void loadPhysicalInterfaces(@Nonnull String physicalInterfaces) {
         for (String intfOnNode : Sets.newConcurrentHashSet(Splitter.on(",").split(physicalInterfaces))) {
             List<String> entries = Lists.newArrayList(Splitter.on(":").split(intfOnNode));
             if (entries.size() != 2) {
@@ -118,109 +124,146 @@ public class VppNodeManager {
      * Synchronizes nodes to DataStore based on their modification state which results in
      * create/update/remove of Node.
      */
-    public void syncNodes(Node dataAfter, Node dataBefore) {
+    public void syncNodes(final Node dataAfter, final Node dataBefore) {
         if (isControllerConfigNode(dataAfter, dataBefore)) {
             LOG.trace("{} is ignored by VPP-renderer", CONTROLLER_CONFIG_NODE);
             return;
         }
+        ListenableFuture<String> syncFuture = Futures.immediateFuture(null);
         // New node
         if (dataBefore == null && dataAfter != null) {
-            createNode(dataAfter);
+            syncFuture = createNode(dataAfter);
         }
         // Connected/disconnected node
-        if (dataBefore != null && dataAfter != null) {
-            updateNode(dataAfter);
+        else if (dataBefore != null && dataAfter != null) {
+            syncFuture = updateNode(dataAfter);
         }
         // Removed node
-        if (dataBefore != null && dataAfter == null) {
-            removeNode(dataBefore);
+        else if (dataBefore != null) {
+            syncFuture = removeNode(dataBefore);
         }
+        Futures.addCallback(syncFuture, new FutureCallback<String>() {
+            @Override
+            public void onSuccess(@Nullable String message) {
+                LOG.info("Node synchronization completed. {} ", message);
+            }
+
+            @Override
+            public void onFailure(@Nonnull Throwable t) {
+                LOG.warn("Node synchronization failed. Data before: {} after {}", dataBefore, dataAfter);
+            }
+        });
     }
 
-    private boolean isControllerConfigNode(Node dataAfter, Node dataBefore) {
+    private boolean isControllerConfigNode(final Node dataAfter, final Node dataBefore) {
         if (dataAfter != null) {
             return CONTROLLER_CONFIG_NODE.equals(dataAfter.getNodeId());
         }
         return CONTROLLER_CONFIG_NODE.equals(dataBefore.getNodeId());
     }
 
-    private void createNode(Node node) {
-        LOG.info("Registering new node {}", node.getNodeId().getValue());
-        NetconfNode netconfNode = getNodeAugmentation(node);
+    private ListenableFuture<String> createNode(final Node node) {
+        final String nodeId = node.getNodeId().getValue();
+        LOG.info("Registering new node {}", nodeId);
+        final NetconfNode netconfNode = getNodeAugmentation(node);
         if (netconfNode == null) {
-            return;
+            final String message = String.format("Node %s is not an netconf node", nodeId);
+            return Futures.immediateFuture(message);
         }
-        NetconfNodeConnectionStatus.ConnectionStatus connectionStatus = netconfNode.getConnectionStatus();
+        final NetconfNodeConnectionStatus.ConnectionStatus connectionStatus = netconfNode.getConnectionStatus();
         switch (connectionStatus) {
-            case Connecting:
-                LOG.info("Connecting device {} ...", node.getNodeId().getValue());
-                break;
-            case Connected:
-                resolveConnectedNode(node, netconfNode);
-                break;
-            default:
-                break;
+            case Connecting: {
+                final String message = String.format("Connecting device %s ...", nodeId);
+                return Futures.immediateFuture(message);
+            }
+            case Connected: {
+                return resolveConnectedNode(node, netconfNode);
+            }
+            case UnableToConnect: {
+                final String message = String.format("Connection status is unable to connect for node %s", nodeId);
+                return Futures.immediateFuture(message);
+            }
+            default: {
+                final String message = String.format("Unknown connection status for node %s", nodeId);
+                return Futures.immediateFailedFuture(new VppRendererProcessingException(message));
+            }
         }
     }
 
-    private void updateNode(Node node) {
-        LOG.info("Updating node {}", node.getNodeId());
-        NetconfNode netconfNode = getNodeAugmentation(node);
-        if (netconfNode == null || netconfNode.getConnectionStatus() == null) {
-            return;
-        }
-        NetconfNodeConnectionStatus.ConnectionStatus afterNodeStatus = netconfNode.getConnectionStatus();
-        if (afterNodeStatus.equals(Connected)) {
-            resolveConnectedNode(node, netconfNode);
+    private ListenableFuture<String> updateNode(final Node node) {
+        final String nodeId = node.getNodeId().getValue();
+        LOG.info("Updating node {}", nodeId);
+        final NetconfNode netconfNode = getNodeAugmentation(node);
+        if (netconfNode == null) {
+            final String message = String.format("Node %s is not an netconf node", nodeId);
+            return Futures.immediateFuture(message);
         }
-        if (afterNodeStatus.equals(Connecting)) {
-            resolveDisconnectedNode(node);
-            LOG.info("Node {} is disconnected, removing from available nodes", node.getNodeId().getValue());
+        final NetconfNodeConnectionStatus.ConnectionStatus afterNodeStatus = netconfNode.getConnectionStatus();
+        if (Connected.equals(afterNodeStatus)) {
+            return resolveConnectedNode(node, netconfNode);
+        } else if (Connecting.equals(afterNodeStatus)) {
+            final String cause = String.format("Node %s is disconnected, removing from available nodes", nodeId);
+            return resolveDisconnectedNode(node, cause);
+        } else if (UnableToConnect.equals(afterNodeStatus)) {
+            final String cause = String.format("New node %s status is unable to connect, removing from available nodes",
+                    nodeId);
+            return resolveDisconnectedNode(node, cause);
+        } else {
+            final String cause = String.format("New node status is unknown. Node %s will be removed from available nodes",
+                    nodeId);
+            return resolveDisconnectedNode(node, cause);
         }
     }
 
-    private void removeNode(Node node) {
-        resolveDisconnectedNode(node);
-        LOG.info("Node {} is removed", node.getNodeId().getValue());
+    private ListenableFuture<String> removeNode(final Node node) {
+        final String cause = String.format("Node %s is removed", node.getNodeId().getValue());
+        return resolveDisconnectedNode(node, cause);
     }
 
-    private void resolveConnectedNode(Node node, NetconfNode netconfNode) {
-        InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
-        RendererNode rendererNode = remapNode(mountPointIid);
+    private ListenableFuture<String> resolveConnectedNode(final Node node, final NetconfNode netconfNode) {
+        final String nodeId = node.getNodeId().getValue();
+        final InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
+        final RendererNode rendererNode = remapNode(mountPointIid);
         if (!isCapableNetconfDevice(node, netconfNode)) {
-            LOG.warn("Node {} is not connected.", node.getNodeId().getValue());
-            return;
+            final String message = String.format("Node %s is not connected", nodeId);
+            return Futures.immediateFuture(message);
         }
         final DataBroker mountpoint = getNodeMountPoint(mountPointIid);
         if (mountpoint == null) {
-            LOG.warn("Mountpoint not available for node {}", node.getNodeId().getValue());
-            return;
+            final String message = String.format("Mountpoint not available for node %s", nodeId);
+            return Futures.immediateFuture(message);
         }
         final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
         wTx.put(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode), rendererNode, true);
-        DataStoreHelper.submitToDs(wTx);
-        LOG.info("Node {} is capable and ready.", node.getNodeId().getValue());
-        syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid);
-        NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces);
+        final boolean submit = DataStoreHelper.submitToDs(wTx);
+        if (submit) {
+            final String message = String.format("Node %s is capable and ready", nodeId);
+            syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid);
+            NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces);
+            return Futures.immediateFuture(message);
+        } else {
+            final String message = String.format("Failed to resolve connected node %s", nodeId);
+            return Futures.immediateFuture(message);
+        }
     }
 
-    private void resolveDisconnectedNode(Node node) {
-        InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
-        RendererNode rendererNode = remapNode(mountPointIid);
+    private ListenableFuture<String> resolveDisconnectedNode(final Node node, final String cause) {
+        final InstanceIdentifier<Node> mountPointIid = getMountpointIid(node);
+        final RendererNode rendererNode = remapNode(mountPointIid);
         final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
         wTx.delete(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode));
-        CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
+        final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture = wTx.submit();
         try {
-            submitFuture.checkedGet();
+            checkedFuture.checkedGet();
+            return Futures.immediateFuture(cause);
         } catch (TransactionCommitFailedException e) {
-            LOG.error("Write transaction failed to {}", e.getMessage());
-        } catch (Exception e) {
-            LOG.error("Failed to .. {}", e.getMessage());
+            final String message = String.format("Failed to resolve disconnected node %s", node.getNodeId().getValue());
+            return Futures.immediateFailedFuture(new VppRendererProcessingException(message));
         }
     }
 
     @Nullable
-    private DataBroker getNodeMountPoint(InstanceIdentifier<Node> mountPointIid) {
+    private DataBroker getNodeMountPoint(final InstanceIdentifier<Node> mountPointIid) {
         final Future<Optional<MountPoint>> futureOptionalObject = getMountpointFromSal(mountPointIid);
         try {
             final Optional<MountPoint> optionalObject = futureOptionalObject.get();
@@ -246,20 +289,20 @@ public class VppNodeManager {
         }
     }
 
-    private RendererNode remapNode(InstanceIdentifier<Node> path) {
-        RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder();
+    private RendererNode remapNode(final InstanceIdentifier<Node> path) {
+        final RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder();
         rendererNodeBuilder.setKey(new RendererNodeKey(path)).setNodePath(path);
         return rendererNodeBuilder.build();
     }
 
-    private InstanceIdentifier<Node> getMountpointIid(Node node) {
+    private InstanceIdentifier<Node> getMountpointIid(final Node node) {
         return InstanceIdentifier.builder(NetworkTopology.class)
-            .child(Topology.class, new TopologyKey(TOPOLOGY_ID))
-            .child(Node.class, new NodeKey(node.getNodeId()))
-            .build();
+                .child(Topology.class, new TopologyKey(TOPOLOGY_ID))
+                .child(Node.class, new NodeKey(node.getNodeId()))
+                .build();
     }
 
-    private boolean isCapableNetconfDevice(Node node, NetconfNode netconfAugmentation) {
+    private boolean isCapableNetconfDevice(final Node node, final NetconfNode netconfAugmentation) {
         if (netconfAugmentation.getAvailableCapabilities() == null
                 || netconfAugmentation.getAvailableCapabilities().getAvailableCapability() == null
                 || netconfAugmentation.getAvailableCapabilities().getAvailableCapability().isEmpty()) {
@@ -281,8 +324,8 @@ public class VppNodeManager {
                 .allMatch(availableCapabilities::contains);
     }
 
-    private NetconfNode getNodeAugmentation(Node node) {
-        NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+    private NetconfNode getNodeAugmentation(final Node node) {
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
         if (netconfNode == null) {
             LOG.warn("Node {} is not a netconf device", node.getNodeId().getValue());
             return null;
@@ -301,7 +344,6 @@ public class VppNodeManager {
      */
     private List<String> initializeRequiredCapabilities() {
         // Required device capabilities
-
         String[] capabilityEntries = {V3PO_CAPABILITY, INTERFACES_CAPABILITY};
         return Arrays.asList(capabilityEntries);
     }
@@ -326,7 +368,7 @@ public class VppNodeManager {
                 } catch (InterruptedException e) {
                     LOG.warn("Thread interrupted to ", e);
                 }
-                attempt ++;
+                attempt++;
             } while (attempt <= 3);
             return Optional.absent();
         };
@@ -384,13 +426,11 @@ public class VppNodeManager {
 
     private List<IpAddress> resolveIpAddress(Interface1 iface) {
         if (iface.getIpv4() != null && iface.getIpv4().getAddress() != null) {
-            return iface.getIpv4().getAddress().stream().map(ipv4 -> {
-                return new IpAddress(new Ipv4Address(ipv4.getIp().getValue()));
-            }).collect(Collectors.toList());
+            return iface.getIpv4().getAddress().stream().map(ipv4 ->
+                    new IpAddress(new Ipv4Address(ipv4.getIp().getValue()))).collect(Collectors.toList());
         } else if (iface.getIpv6() != null && iface.getIpv6().getAddress() != null) {
-            return iface.getIpv6().getAddress().stream().map(ipv6 -> {
-                return new IpAddress(new Ipv4Address(ipv6.getIp().getValue()));
-            }).collect(Collectors.toList());
+            return iface.getIpv6().getAddress().stream().map(ipv6 ->
+                    new IpAddress(new Ipv4Address(ipv6.getIp().getValue()))).collect(Collectors.toList());
         }
         return Lists.newArrayList();
     }