Bug 8902 - Changes in FRM for Reconciliation Framework
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / FlowNodeReconciliationImpl.java
index 788199d2810cc4254c72d9dae3b81d553e611e27..0e4780d8d34fdbbe5bb481cf9c6d968a96301d78 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -26,19 +25,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.action.types.rev131112.action.action.GroupActionCase;
@@ -59,6 +60,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.Buckets;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.group.buckets.Bucket;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
@@ -70,7 +72,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.MeterId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupTypes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.ControlBundleInput;
@@ -91,8 +92,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleControlType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -117,6 +119,11 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
 
     private final DataBroker dataBroker;
     private final ForwardingRulesManager provider;
+    private final String serviceName;
+    final private int priority;
+    final private ResultState resultState;
+    private Map<DeviceInfo, ListenableFuture<Boolean>> futureMap = new HashMap<>();
+
     private final ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
 
     private final SalBundleService salBundleService;
@@ -124,9 +131,13 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
     private static final AtomicLong BUNDLE_ID = new AtomicLong();
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
 
-    public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db) {
+    public FlowNodeReconciliationImpl (final ForwardingRulesManager manager, final DataBroker db,
+            final String serviceName, final int priority, final ResultState resultState) {
         this.provider = Preconditions.checkNotNull(manager, "ForwardingRulesManager can not be null!");
         dataBroker = Preconditions.checkNotNull(db, "DataBroker can not be null!");
+        this.serviceName = serviceName;
+        this.priority = priority;
+        this.resultState = resultState;
         salBundleService = Preconditions.checkNotNull(manager.getSalBundleService(),"salBundleService can not be null!");
     }
 
@@ -137,32 +148,25 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         }
     }
 
-    @Override
-    public void reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode) {
-        if (provider.isReconciliationDisabled()) {
-            LOG.debug("Reconciliation is disabled by user. Skipping reconciliation of node : {}", connectedNode
-                    .firstKeyOf(Node.class));
-            return;
+    private ListenableFuture<Boolean> reconcileConfiguration(InstanceIdentifier<FlowCapableNode> connectedNode) {
+        LOG.info("Triggering reconciliation for device {}", connectedNode.firstKeyOf(Node.class));
+        if (provider.isStaleMarkingEnabled()) {
+            LOG.info("Stale-Marking is ENABLED and proceeding with deletion of "
+                    + "stale-marked entities on switch {}",
+                    connectedNode.toString());
+            reconciliationPreProcess(connectedNode);
         }
-        if (provider.isNodeOwner(connectedNode)) {
-            LOG.info("Triggering reconciliation for device {}", connectedNode.firstKeyOf(Node.class));
-            if (provider.isStaleMarkingEnabled()) {
-                LOG.info("Stale-Marking is ENABLED and proceeding with deletion of stale-marked entities on switch {}",
-                        connectedNode.toString());
-                reconciliationPreProcess(connectedNode);
-            }
-            LOG.debug("Bundle based reconciliation status : {}", provider.isBundleBasedReconciliationEnabled()?"Enable":"Disable");
-            if (provider.isBundleBasedReconciliationEnabled()) {
-                BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
-                executor.execute(bundleBasedReconTask);
-            } else {
-                ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
-                executor.execute(reconciliationTask);
-            }
+        LOG.debug("Bundle based reconciliation status : {}", provider.isBundleBasedReconciliationEnabled()?"Enable":"Disable");
+        if (provider.isBundleBasedReconciliationEnabled()) {
+            BundleBasedReconciliationTask bundleBasedReconTask = new BundleBasedReconciliationTask(connectedNode);
+            return JdkFutureAdapters.listenInPoolThread(executor.submit(bundleBasedReconTask));
+        } else {
+            ReconciliationTask reconciliationTask = new ReconciliationTask(connectedNode);
+            return JdkFutureAdapters.listenInPoolThread(executor.submit(reconciliationTask));
         }
     }
 
-    private class BundleBasedReconciliationTask implements Runnable {
+    private class BundleBasedReconciliationTask implements Callable<Boolean> {
         final InstanceIdentifier<FlowCapableNode> nodeIdentity;
 
         public BundleBasedReconciliationTask(final InstanceIdentifier<FlowCapableNode> nodeIdent) {
@@ -170,7 +174,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
         }
 
         @Override
-        public void run() {
+        public Boolean call() {
             String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
             Optional<FlowCapableNode> flowNode = Optional.absent();
             BundleId bundleIdValue = new BundleId(BUNDLE_ID.getAndIncrement());
@@ -231,25 +235,66 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
                 /* Bundles not supported for meters*/
                 List<Meter> meters = flowNode.get().getMeter() != null
                         ? flowNode.get().getMeter() : Collections.emptyList();
-                ListenableFuture<RpcResult<Void>> meterFuture =
-                        Futures.transformAsync(commitBundleFuture, rpcResult -> {
-                            if (rpcResult.isSuccessful()) {
-                                for (Meter meter : meters) {
-                                    final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
-                                            nodeIdentity.child(Meter.class, meter.getKey());
-                                    provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
-                                }
+                        ListenableFuture<RpcResult<Void>> meterFuture =
+                                Futures.transformAsync(commitBundleFuture, rpcResult -> {
+                                    if (rpcResult.isSuccessful()) {
+                                        for (Meter meter : meters) {
+                                            final KeyedInstanceIdentifier<Meter, MeterKey> meterIdent =
+                                                    nodeIdentity.child(Meter.class, meter.getKey());
+                                            provider.getMeterCommiter().add(meterIdent, meter, nodeIdentity);
+                                        }
+                                    }
+                                    return Futures.immediateFuture(null);
+                                });
+
+                        trans.close();
+                        try {
+                            if(commitBundleFuture.get().isSuccessful()) {
+                                LOG.debug("Completing bundle based reconciliation for device ID:{}", nDpId);
+                                return true;
+                            } else {
+                                return false;
                             }
-                            return Futures.immediateFuture(null);
-                        });
-
-                LOG.debug("Completing bundle based reconciliation for device ID:{}", nDpId);
-                trans.close();
+                        } catch (InterruptedException | ExecutionException e) {
+                            LOG.error("Error while doing bundle based reconciliation for device ID:{}", nodeIdentity);
+                            return false;
+                        }
             }
+            LOG.error("FlowNode not present for Datapath ID {}", nDpId);
+            return false;
         }
     }
 
-    private class ReconciliationTask implements Runnable {
+    @Override
+    public ListenableFuture<Boolean> startReconciliation(DeviceInfo node) {
+        InstanceIdentifier<FlowCapableNode> connectedNode = node.getNodeInstanceIdentifier()
+                .augmentation(FlowCapableNode.class);
+        return futureMap.computeIfAbsent(node, future -> reconcileConfiguration(connectedNode));
+    }
+
+    @Override
+    public ListenableFuture<Boolean> endReconciliation(DeviceInfo node) {
+        futureMap.computeIfPresent(node, (key, future) -> future).cancel(true);
+        futureMap.remove(node);
+        return Futures.immediateFuture(true);
+    }
+
+    @Override
+    public int getPriority() {
+        return priority;
+    }
+
+    @Override
+    public String getName() {
+        return serviceName;
+    }
+
+    @Override
+    public ResultState getResultState() {
+        return resultState;
+    }
+
+    private class ReconciliationTask implements Callable<Boolean> {
 
         InstanceIdentifier<FlowCapableNode> nodeIdentity;
 
@@ -257,20 +302,19 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
            nodeIdentity = nodeIdent;
         }
 
-        @Override
-        public void run() {
+        public Boolean call() {
             String sNode = nodeIdentity.firstKeyOf(Node.class, NodeKey.class).getId().getValue();
             BigInteger nDpId = getDpnIdFromNodeName(sNode);
 
             ReadOnlyTransaction trans = provider.getReadTranaction();
             Optional<FlowCapableNode> flowNode = Optional.absent();
-
             //initialize the counter
             int counter = 0;
             try {
                 flowNode = trans.read(LogicalDatastoreType.CONFIGURATION, nodeIdentity).get();
             } catch (Exception e) {
                 LOG.warn("Fail with read Config/DS for Node {} !", nodeIdentity, e);
+                return false;
             }
 
             if (flowNode.isPresent()) {
@@ -413,6 +457,7 @@ public class FlowNodeReconciliationImpl implements FlowNodeReconciliation {
             }
         /* clean transaction */
             trans.close();
+            return true;
         }
 
         /**