Merge "openflow devices are getting disconnected from Controller, sending OFPBFC_BAD_...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowServiceImpl.java
index ff1f3c8ea2b4bd6fe613ca80d0a79f49266a74a2..a9f4c04e6439e66a8d246ec33ff9d5c571eca8f3 100644 (file)
@@ -7,14 +7,21 @@
  */
 package org.opendaylight.openflowplugin.impl.services.sal;
 
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCache;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupStatus;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
@@ -26,6 +33,7 @@ import org.opendaylight.openflowplugin.impl.services.multilayer.MultiLayerFlowSe
 import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlowService;
 import org.opendaylight.openflowplugin.impl.util.ErrorUtil;
 import org.opendaylight.openflowplugin.impl.util.FlowCreatorUtil;
+import org.opendaylight.openflowplugin.impl.util.PathUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
@@ -40,10 +48,12 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Upda
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,11 +66,17 @@ public class SalFlowServiceImpl implements SalFlowService {
     private final SingleLayerFlowService<UpdateFlowOutput> flowUpdateMessage;
     private final SingleLayerFlowService<RemoveFlowOutput> flowRemoveMessage;
     private final DeviceContext deviceContext;
+    private static final Uint8 OFPTT_ALL = Uint8.MAX_VALUE;
+    private final FlowGroupCacheManager provider;
+    public static final int FLOWGROUP_CACHE_SIZE = 10000;
+
 
     public SalFlowServiceImpl(final RequestContextStack requestContextStack,
                               final DeviceContext deviceContext,
-                              final ConvertorExecutor convertorExecutor) {
+                              final ConvertorExecutor convertorExecutor,
+                              final FlowGroupCacheManager provider) {
         this.deviceContext = deviceContext;
+        this.provider = provider;
         flowRemove = new MultiLayerFlowService<>(requestContextStack,
                                                  deviceContext,
                                                  RemoveFlowOutput.class,
@@ -83,13 +99,15 @@ public class SalFlowServiceImpl implements SalFlowService {
         final FlowRegistryKey flowRegistryKey =
                 FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
         final ListenableFuture<RpcResult<AddFlowOutput>> future;
-
+        NodeId nodeId = PathUtil.extractNodeId(input.getNode());
         if (flowAddMessage.canUseSingleLayerSerialization()) {
             future = flowAddMessage.handleServiceCall(input);
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey), MoreExecutors.directExecutor());
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey, nodeId),
+                    MoreExecutors.directExecutor());
         } else {
             future = flowAdd.processFlowModInputBuilders(flowAdd.toFlowModInputs(input));
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey), MoreExecutors.directExecutor());
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey, nodeId),
+                    MoreExecutors.directExecutor());
 
         }
         return future;
@@ -98,14 +116,14 @@ public class SalFlowServiceImpl implements SalFlowService {
     @Override
     public ListenableFuture<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
         final ListenableFuture<RpcResult<RemoveFlowOutput>> future;
-
+        NodeId nodeId = PathUtil.extractNodeId(input.getNode());
         if (flowRemoveMessage.canUseSingleLayerSerialization()) {
             future = flowRemoveMessage.handleServiceCall(input);
-            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
+            Futures.addCallback(future, new RemoveFlowCallback(input, nodeId), MoreExecutors.directExecutor());
 
         } else {
             future = flowRemove.processFlowModInputBuilders(flowRemove.toFlowModInputs(input));
-            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
+            Futures.addCallback(future, new RemoveFlowCallback(input, nodeId), MoreExecutors.directExecutor());
         }
 
         return future;
@@ -115,7 +133,7 @@ public class SalFlowServiceImpl implements SalFlowService {
     public ListenableFuture<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
         final UpdatedFlow updated = input.getUpdatedFlow();
         final OriginalFlow original = input.getOriginalFlow();
-
+        String nodeId =  PathUtil.extractNodeId(input.getNode()).getValue();
         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
         final List<FlowModInputBuilder> ofFlowModInputs;
 
@@ -184,27 +202,39 @@ public class SalFlowServiceImpl implements SalFlowService {
             future = flowUpdate.processFlowModInputBuilders(allFlowMods);
         }
 
-        Futures.addCallback(future, new UpdateFlowCallback(input), MoreExecutors.directExecutor());
+        Futures.addCallback(future, new UpdateFlowCallback(input, nodeId), MoreExecutors.directExecutor());
         return future;
     }
 
     private final class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
         private final AddFlowInput input;
         private final FlowRegistryKey flowRegistryKey;
+        private final NodeId nodeId;
 
         private AddFlowCallback(final AddFlowInput input,
-                                final FlowRegistryKey flowRegistryKey) {
+                                final FlowRegistryKey flowRegistryKey,
+                                final NodeId nodeId) {
             this.input = input;
             this.flowRegistryKey = flowRegistryKey;
+            this.nodeId = nodeId;
         }
 
         @Override
         public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
             if (rpcResult.isSuccessful()) {
                 final FlowDescriptor flowDescriptor;
-
+                final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
+                FlowGroupCache cache = new FlowGroupCache(flowId.getValue(), input.getTableId().toString(),
+                        FlowGroupStatus.ADDED, LocalDateTime.now());
+                if (provider.getAllNodesFlowGroupCache().containsKey(nodeId.getValue())) {
+                    provider.getAllNodesFlowGroupCache().get(nodeId.getValue()).add(cache);
+                } else {
+                    Queue<FlowGroupCache> flowGroupCacheList =
+                            Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
+                    flowGroupCacheList.add(cache);
+                    provider.getAllNodesFlowGroupCache().put(nodeId.getValue(), flowGroupCacheList);
+                }
                 if (input.getFlowRef() != null) {
-                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
                     flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
                     deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
                 } else {
@@ -231,9 +261,11 @@ public class SalFlowServiceImpl implements SalFlowService {
 
     private final class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
         private final RemoveFlowInput input;
+        private final NodeId nodeId;
 
-        private RemoveFlowCallback(final RemoveFlowInput input) {
+        private RemoveFlowCallback(final RemoveFlowInput input, final NodeId nodeId) {
             this.input = input;
+            this.nodeId = nodeId;
         }
 
         @Override
@@ -242,9 +274,25 @@ public class SalFlowServiceImpl implements SalFlowService {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove finished without error for flow={}", input);
                 }
-                FlowRegistryKey flowRegistryKey =
-                        FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
-                deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
+                if (input.getTableId() != null && !input.getTableId().equals(OFPTT_ALL)) {
+                    FlowRegistryKey flowRegistryKey =
+                            FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
+                    deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
+                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
+                    FlowGroupCache cache = new FlowGroupCache(flowId.getValue(),
+                            input.getTableId().toString(), FlowGroupStatus.REMOVED,
+                            LocalDateTime.now());
+                    if (provider.getAllNodesFlowGroupCache().containsKey(nodeId.getValue())) {
+                        provider.getAllNodesFlowGroupCache().get(nodeId.getValue()).add(cache);
+                    } else {
+                        Queue<FlowGroupCache> flowGroupCacheList =
+                                Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
+                        flowGroupCacheList.add(cache);
+                        provider.getAllNodesFlowGroupCache().put(nodeId.getValue(), flowGroupCacheList);
+                    }
+                } else {
+                    deviceContext.getDeviceFlowRegistry().clearFlowRegistry();
+                }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove failed for flow={}, errors={}", input,
@@ -261,15 +309,17 @@ public class SalFlowServiceImpl implements SalFlowService {
 
     private final class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
         private final UpdateFlowInput input;
+        private final String nodeId;
 
-        private UpdateFlowCallback(UpdateFlowInput input) {
+        private UpdateFlowCallback(UpdateFlowInput input,  String nodeId) {
             this.input = input;
+            this.nodeId = nodeId;
         }
 
         @Override
         public void onSuccess(final RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult) {
             final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
-
+            final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
             final UpdatedFlow updated = input.getUpdatedFlow();
             final OriginalFlow original = input.getOriginalFlow();
             final FlowRegistryKey origFlowRegistryKey =
@@ -280,6 +330,17 @@ public class SalFlowServiceImpl implements SalFlowService {
 
             final boolean isUpdate = origFlowDescriptor != null;
             final FlowDescriptor updatedFlowDescriptor;
+            FlowGroupCache cache = new FlowGroupCache(flowId.getValue(), updated.getTableId().toString(),
+                    FlowGroupStatus.MODIFIED,
+                    LocalDateTime.now());
+            if (provider.getAllNodesFlowGroupCache().containsKey(nodeId)) {
+                provider.getAllNodesFlowGroupCache().get(nodeId).add(cache);
+            } else {
+                Queue<FlowGroupCache> flowGroupCacheList =
+                        Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
+                flowGroupCacheList.add(cache);
+                provider.getAllNodesFlowGroupCache().put(nodeId, flowGroupCacheList);
+            }
 
             if (input.getFlowRef() != null) {
                 updatedFlowDescriptor =