update deprecated transform and addCallback methods
[groupbasedpolicy.git] / renderers / vpp / src / main / java / org / opendaylight / groupbasedpolicy / renderer / vpp / manager / VppNodeManager.java
index fb2c35aa0481fd8afd68ee73ae9b15709ffcdd82..c2bfe42b6013858f4cbd53b6a02fc7385a3ac2ac 100644 (file)
@@ -18,18 +18,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.MountPoint;
 import org.opendaylight.controller.md.sal.binding.api.MountPointService;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
@@ -41,6 +35,7 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.lisp.info.container.HostRelatedInfoContainer;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.lisp.info.container.states.PhysicalInterfaces;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.nat.NatUtil;
+import org.opendaylight.groupbasedpolicy.renderer.vpp.util.MountedDataBrokerProvider;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppIidFactory;
 import org.opendaylight.groupbasedpolicy.renderer.vpp.util.VppRendererProcessingException;
 import org.opendaylight.groupbasedpolicy.util.DataStoreHelper;
@@ -82,10 +77,10 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class VppNodeManager {
 
-    private static final short DURATION = 3000;
     private static final TopologyId TOPOLOGY_ID = new TopologyId("topology-netconf");
     private static final Logger LOG = LoggerFactory.getLogger(VppNodeManager.class);
     private static final String V3PO_CAPABILITY = "(urn:opendaylight:params:xml:ns:yang:v3po?revision=2017-06-07)v3po";
@@ -98,11 +93,13 @@ public class VppNodeManager {
     private final List<String> requiredCapabilities;
     private final MountPointService mountService;
     private final HostRelatedInfoContainer hostRelatedInfoContainer = HostRelatedInfoContainer.getInstance();
+    private final MountedDataBrokerProvider mountProvider;
 
     public VppNodeManager(@Nonnull final DataBroker dataBroker,
             @Nonnull final BindingAwareBroker.ProviderContext session, @Nullable String physicalInterfaces) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.mountService = Preconditions.checkNotNull(session.getSALService(MountPointService.class));
+        this.mountProvider = new MountedDataBrokerProvider(mountService, dataBroker);
         requiredCapabilities = initializeRequiredCapabilities();
         if (!Strings.isNullOrEmpty(physicalInterfaces) && !Objects.equals(physicalInterfaces, NO_PUBLIC_INT_SPECIFIED)) {
             loadPhysicalInterfaces(physicalInterfaces);
@@ -129,6 +126,8 @@ public class VppNodeManager {
     /**
      * Synchronizes nodes to DataStore based on their modification state which results in
      * create/update/remove of Node.
+     * @param dataAfter data after modification
+     * @param dataBefore data Before modification
      */
     public void syncNodes(final Node dataAfter, final Node dataBefore) {
         if (isControllerConfigNode(dataAfter, dataBefore)) {
@@ -158,7 +157,7 @@ public class VppNodeManager {
             public void onFailure(@Nonnull Throwable t) {
                 LOG.warn("Node synchronization failed. Data before: {} after {}", dataBefore, dataAfter);
             }
-        });
+        }, MoreExecutors.directExecutor());
     }
 
     private boolean isControllerConfigNode(final Node dataAfter, final Node dataBefore) {
@@ -234,8 +233,8 @@ public class VppNodeManager {
             final String message = String.format("Node %s is not connected", nodeId);
             return Futures.immediateFuture(message);
         }
-        final DataBroker mountpoint = getNodeMountPoint(mountPointIid);
-        if (mountpoint == null) {
+        final Optional<DataBroker> mountpoint = mountProvider.resolveDataBrokerForMountPoint(mountPointIid);
+        if (mountpoint.isPresent()) {
             final String message = String.format("Mountpoint not available for node %s", nodeId);
             return Futures.immediateFuture(message);
         }
@@ -244,8 +243,8 @@ public class VppNodeManager {
         final boolean submit = DataStoreHelper.submitToDs(wTx);
         if (submit) {
             final String message = String.format("Node %s is capable and ready", nodeId);
-            syncPhysicalInterfacesInLocalDs(mountpoint, mountPointIid);
-            NatUtil.resolveOutboundNatInterface(mountpoint, mountPointIid, node.getNodeId(), extInterfaces);
+            syncPhysicalInterfacesInLocalDs(mountpoint.get(), mountPointIid);
+            NatUtil.resolveOutboundNatInterface(mountPointIid, node.getNodeId(), extInterfaces);
             return Futures.immediateFuture(message);
         } else {
             final String message = String.format("Failed to resolve connected node %s", nodeId);
@@ -258,7 +257,6 @@ public class VppNodeManager {
         final RendererNode rendererNode = remapNode(mountPointIid);
         final WriteTransaction wTx = dataBroker.newWriteOnlyTransaction();
         wTx.delete(LogicalDatastoreType.OPERATIONAL, VppIidFactory.getRendererNodeIid(rendererNode));
-        extInterfaces.remove(node.getNodeId());
         final CheckedFuture<Void, TransactionCommitFailedException> checkedFuture = wTx.submit();
         try {
             checkedFuture.checkedGet();
@@ -269,31 +267,6 @@ public class VppNodeManager {
         }
     }
 
-    @Nullable
-    private DataBroker getNodeMountPoint(final InstanceIdentifier<Node> mountPointIid) {
-        final Future<Optional<MountPoint>> futureOptionalObject = getMountpointFromSal(mountPointIid);
-        try {
-            final Optional<MountPoint> optionalObject = futureOptionalObject.get();
-            LOG.debug("Optional mountpoint object: {}", optionalObject);
-            MountPoint mountPoint;
-            if (optionalObject.isPresent()) {
-                mountPoint = optionalObject.get();
-                Optional<DataBroker> optionalDataBroker = mountPoint.getService(DataBroker.class);
-                if (optionalDataBroker.isPresent()) {
-                    return optionalDataBroker.get();
-                } else {
-                    LOG.warn("Cannot obtain data broker from mountpoint {}", mountPoint);
-                }
-            } else {
-                LOG.warn("Cannot obtain mountpoint with IID {}", mountPointIid);
-            }
-            return null;
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.warn("Unable to obtain mountpoint ... {}", e);
-            return null;
-        }
-    }
-
     private RendererNode remapNode(final InstanceIdentifier<Node> path) {
         final RendererNodeBuilder rendererNodeBuilder = new RendererNodeBuilder();
         rendererNodeBuilder.setKey(new RendererNodeKey(path)).setNodePath(path);
@@ -353,33 +326,6 @@ public class VppNodeManager {
         return Arrays.asList(capabilityEntries);
     }
 
-    // TODO bug 7699
-    // This works as a workaround for mountpoint registration in cluster. If application is registered on different
-    // node as netconf service, it obtains mountpoint registered by SlaveSalFacade (instead of MasterSalFacade). However
-    // this service registers mountpoint a moment later then connectionStatus is set to "Connected". If NodeManager hits
-    // state where device is connected but mountpoint is not yet available, try to get it again in a while
-    private Future<Optional<MountPoint>> getMountpointFromSal(final InstanceIdentifier<Node> iid) {
-        final ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final Callable<Optional<MountPoint>> task = () -> {
-            byte attempt = 0;
-            do {
-                try {
-                    final Optional<MountPoint> optionalMountpoint = mountService.getMountPoint(iid);
-                    if (optionalMountpoint.isPresent()) {
-                        return optionalMountpoint;
-                    }
-                    LOG.warn("Mountpoint {} is not registered yet", iid);
-                    Thread.sleep(DURATION);
-                } catch (InterruptedException e) {
-                    LOG.warn("Thread interrupted to ", e);
-                }
-                attempt++;
-            } while (attempt <= 3);
-            return Optional.absent();
-        };
-        return executorService.submit(task);
-    }
-
     private void syncPhysicalInterfacesInLocalDs(DataBroker mountPointDataBroker, InstanceIdentifier<Node> nodeIid) {
         ReadWriteTransaction rwTx = dataBroker.newReadWriteTransaction();
         ReadOnlyTransaction rTx = mountPointDataBroker.newReadOnlyTransaction();
@@ -475,7 +421,7 @@ public class VppNodeManager {
                 java.util.Optional<PhysicalInterface> pubInt = rn.getAugmentation(VppInterfaceAugmentation.class)
                     .getPhysicalInterface()
                     .stream()
-                    .filter(phInt -> phInt.isExternal())
+                    .filter(PhysicalInterface::isExternal)
                     .findFirst();
                 if (pubInt.isPresent()) {
                     nodes.put(rn.getNodePath().firstKeyOf(Node.class).getNodeId(), pubInt.get().getInterfaceName());