Bump MRI upstreams
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowServiceImpl.java
index a9f4c04e6439e66a8d246ec33ff9d5c571eca8f3..6542234329a4d3fb7d08d9221c4d503b7c0e7a80 100644 (file)
@@ -7,20 +7,14 @@
  */
 package org.opendaylight.openflowplugin.impl.services.sal;
 
-import com.google.common.collect.EvictingQueue;
-import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Queue;
 import org.opendaylight.openflowplugin.api.OFConstants;
-import org.opendaylight.openflowplugin.api.openflow.FlowGroupCache;
-import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
 import org.opendaylight.openflowplugin.api.openflow.FlowGroupStatus;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
@@ -33,7 +27,6 @@ import org.opendaylight.openflowplugin.impl.services.multilayer.MultiLayerFlowSe
 import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlowService;
 import org.opendaylight.openflowplugin.impl.util.ErrorUtil;
 import org.opendaylight.openflowplugin.impl.util.FlowCreatorUtil;
-import org.opendaylight.openflowplugin.impl.util.PathUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
@@ -48,8 +41,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Upda
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -59,6 +53,8 @@ import org.slf4j.LoggerFactory;
 
 public class SalFlowServiceImpl implements SalFlowService {
     private static final Logger LOG = LoggerFactory.getLogger(SalFlowServiceImpl.class);
+    private static final Uint8 OFPTT_ALL = Uint8.MAX_VALUE;
+
     private final MultiLayerFlowService<UpdateFlowOutput> flowUpdate;
     private final MultiLayerFlowService<AddFlowOutput> flowAdd;
     private final MultiLayerFlowService<RemoveFlowOutput> flowRemove;
@@ -66,17 +62,11 @@ public class SalFlowServiceImpl implements SalFlowService {
     private final SingleLayerFlowService<UpdateFlowOutput> flowUpdateMessage;
     private final SingleLayerFlowService<RemoveFlowOutput> flowRemoveMessage;
     private final DeviceContext deviceContext;
-    private static final Uint8 OFPTT_ALL = Uint8.MAX_VALUE;
-    private final FlowGroupCacheManager provider;
-    public static final int FLOWGROUP_CACHE_SIZE = 10000;
-
 
     public SalFlowServiceImpl(final RequestContextStack requestContextStack,
                               final DeviceContext deviceContext,
-                              final ConvertorExecutor convertorExecutor,
-                              final FlowGroupCacheManager provider) {
+                              final ConvertorExecutor convertorExecutor) {
         this.deviceContext = deviceContext;
-        this.provider = provider;
         flowRemove = new MultiLayerFlowService<>(requestContextStack,
                                                  deviceContext,
                                                  RemoveFlowOutput.class,
@@ -99,14 +89,13 @@ public class SalFlowServiceImpl implements SalFlowService {
         final FlowRegistryKey flowRegistryKey =
                 FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
         final ListenableFuture<RpcResult<AddFlowOutput>> future;
-        NodeId nodeId = PathUtil.extractNodeId(input.getNode());
         if (flowAddMessage.canUseSingleLayerSerialization()) {
             future = flowAddMessage.handleServiceCall(input);
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey, nodeId),
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey),
                     MoreExecutors.directExecutor());
         } else {
             future = flowAdd.processFlowModInputBuilders(flowAdd.toFlowModInputs(input));
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey, nodeId),
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey),
                     MoreExecutors.directExecutor());
 
         }
@@ -116,14 +105,13 @@ public class SalFlowServiceImpl implements SalFlowService {
     @Override
     public ListenableFuture<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
         final ListenableFuture<RpcResult<RemoveFlowOutput>> future;
-        NodeId nodeId = PathUtil.extractNodeId(input.getNode());
         if (flowRemoveMessage.canUseSingleLayerSerialization()) {
             future = flowRemoveMessage.handleServiceCall(input);
-            Futures.addCallback(future, new RemoveFlowCallback(input, nodeId), MoreExecutors.directExecutor());
+            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
 
         } else {
             future = flowRemove.processFlowModInputBuilders(flowRemove.toFlowModInputs(input));
-            Futures.addCallback(future, new RemoveFlowCallback(input, nodeId), MoreExecutors.directExecutor());
+            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
         }
 
         return future;
@@ -133,7 +121,6 @@ public class SalFlowServiceImpl implements SalFlowService {
     public ListenableFuture<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
         final UpdatedFlow updated = input.getUpdatedFlow();
         final OriginalFlow original = input.getOriginalFlow();
-        String nodeId =  PathUtil.extractNodeId(input.getNode()).getValue();
         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
         final List<FlowModInputBuilder> ofFlowModInputs;
 
@@ -150,11 +137,11 @@ public class SalFlowServiceImpl implements SalFlowService {
                 Futures.addCallback(listListenableFuture, new FutureCallback<List<RpcResult<UpdateFlowOutput>>>() {
                     @Override
                     public void onSuccess(final List<RpcResult<UpdateFlowOutput>> results) {
-                        final ArrayList<RpcError> errors = new ArrayList();
+                        final ArrayList<RpcError> errors = new ArrayList<>();
                         for (RpcResult<UpdateFlowOutput> flowModResult : results) {
                             if (flowModResult == null) {
                                 errors.add(RpcResultBuilder.newError(
-                                        RpcError.ErrorType.PROTOCOL, OFConstants.APPLICATION_TAG,
+                                        ErrorType.PROTOCOL, OFConstants.APPLICATION_TAG,
                                         "unexpected flowMod result (null) occurred"));
                             } else if (!flowModResult.isSuccessful()) {
                                 errors.addAll(flowModResult.getErrors());
@@ -202,54 +189,48 @@ public class SalFlowServiceImpl implements SalFlowService {
             future = flowUpdate.processFlowModInputBuilders(allFlowMods);
         }
 
-        Futures.addCallback(future, new UpdateFlowCallback(input, nodeId), MoreExecutors.directExecutor());
+        Futures.addCallback(future, new UpdateFlowCallback(input), MoreExecutors.directExecutor());
         return future;
     }
 
     private final class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
         private final AddFlowInput input;
         private final FlowRegistryKey flowRegistryKey;
-        private final NodeId nodeId;
 
-        private AddFlowCallback(final AddFlowInput input,
-                                final FlowRegistryKey flowRegistryKey,
-                                final NodeId nodeId) {
+        private AddFlowCallback(final AddFlowInput input, final FlowRegistryKey flowRegistryKey) {
             this.input = input;
             this.flowRegistryKey = flowRegistryKey;
-            this.nodeId = nodeId;
         }
 
         @Override
         public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
-            if (rpcResult.isSuccessful()) {
-                final FlowDescriptor flowDescriptor;
-                final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
-                FlowGroupCache cache = new FlowGroupCache(flowId.getValue(), input.getTableId().toString(),
-                        FlowGroupStatus.ADDED, LocalDateTime.now());
-                if (provider.getAllNodesFlowGroupCache().containsKey(nodeId.getValue())) {
-                    provider.getAllNodesFlowGroupCache().get(nodeId.getValue()).add(cache);
-                } else {
-                    Queue<FlowGroupCache> flowGroupCacheList =
-                            Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
-                    flowGroupCacheList.add(cache);
-                    provider.getAllNodesFlowGroupCache().put(nodeId.getValue(), flowGroupCacheList);
-                }
-                if (input.getFlowRef() != null) {
-                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
-                } else {
-                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey);
-                    flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
-                }
-            } else {
+            if (!rpcResult.isSuccessful()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow add failed for flow={}, errors={}", input,
                             ErrorUtil.errorsToString(rpcResult.getErrors()));
                 }
+                return;
+            }
+
+            final DeviceFlowRegistry flowRegistry = deviceContext.getDeviceFlowRegistry();
+            final FlowDescriptor flowDescriptor;
+            final FlowRef flowRef = input.getFlowRef();
+            if (flowRef != null) {
+                final Uint8 tableId = input.getTableId();
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                flowDescriptor = FlowDescriptorFactory.create(tableId, flowId);
+
+                // FIXME: this looks like an atomic operation
+                flowRegistry.appendHistoryFlow(flowId, tableId, FlowGroupStatus.ADDED);
+                flowRegistry.storeDescriptor(flowRegistryKey, flowDescriptor);
+            } else {
+                // FIXME: this looks like an atomic operation
+                flowRegistry.store(flowRegistryKey);
+                flowDescriptor = flowRegistry.retrieveDescriptor(flowRegistryKey);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
             }
         }
 
@@ -261,11 +242,9 @@ public class SalFlowServiceImpl implements SalFlowService {
 
     private final class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
         private final RemoveFlowInput input;
-        private final NodeId nodeId;
 
-        private RemoveFlowCallback(final RemoveFlowInput input, final NodeId nodeId) {
+        private RemoveFlowCallback(final RemoveFlowInput input) {
             this.input = input;
-            this.nodeId = nodeId;
         }
 
         @Override
@@ -274,24 +253,19 @@ public class SalFlowServiceImpl implements SalFlowService {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove finished without error for flow={}", input);
                 }
+                final DeviceFlowRegistry flowRegistry = deviceContext.getDeviceFlowRegistry();
                 if (input.getTableId() != null && !input.getTableId().equals(OFPTT_ALL)) {
                     FlowRegistryKey flowRegistryKey =
                             FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
-                    deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
-                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
-                    FlowGroupCache cache = new FlowGroupCache(flowId.getValue(),
-                            input.getTableId().toString(), FlowGroupStatus.REMOVED,
-                            LocalDateTime.now());
-                    if (provider.getAllNodesFlowGroupCache().containsKey(nodeId.getValue())) {
-                        provider.getAllNodesFlowGroupCache().get(nodeId.getValue()).add(cache);
-                    } else {
-                        Queue<FlowGroupCache> flowGroupCacheList =
-                                Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
-                        flowGroupCacheList.add(cache);
-                        provider.getAllNodesFlowGroupCache().put(nodeId.getValue(), flowGroupCacheList);
+                    flowRegistry.addMark(flowRegistryKey);
+
+                    final FlowRef flowRef = input.getFlowRef();
+                    if (flowRef != null) {
+                        final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                        flowRegistry.appendHistoryFlow(flowId, input.getTableId(), FlowGroupStatus.REMOVED);
                     }
                 } else {
-                    deviceContext.getDeviceFlowRegistry().clearFlowRegistry();
+                    flowRegistry.clearFlowRegistry();
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -309,17 +283,14 @@ public class SalFlowServiceImpl implements SalFlowService {
 
     private final class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
         private final UpdateFlowInput input;
-        private final String nodeId;
 
-        private UpdateFlowCallback(UpdateFlowInput input,  String nodeId) {
+        private UpdateFlowCallback(final UpdateFlowInput input) {
             this.input = input;
-            this.nodeId = nodeId;
         }
 
         @Override
         public void onSuccess(final RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult) {
             final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
-            final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class).getId();
             final UpdatedFlow updated = input.getUpdatedFlow();
             final OriginalFlow original = input.getOriginalFlow();
             final FlowRegistryKey origFlowRegistryKey =
@@ -330,29 +301,19 @@ public class SalFlowServiceImpl implements SalFlowService {
 
             final boolean isUpdate = origFlowDescriptor != null;
             final FlowDescriptor updatedFlowDescriptor;
-            FlowGroupCache cache = new FlowGroupCache(flowId.getValue(), updated.getTableId().toString(),
-                    FlowGroupStatus.MODIFIED,
-                    LocalDateTime.now());
-            if (provider.getAllNodesFlowGroupCache().containsKey(nodeId)) {
-                provider.getAllNodesFlowGroupCache().get(nodeId).add(cache);
+            final FlowRef flowRef = input.getFlowRef();
+            if (flowRef != null) {
+                final Uint8 tableId = updated.getTableId();
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                // FIXME: this does not look right, we probably want better integration
+                deviceFlowRegistry.appendHistoryFlow(flowId, tableId, FlowGroupStatus.MODIFIED);
+
+                updatedFlowDescriptor = FlowDescriptorFactory.create(tableId, flowId);
+            } else if (isUpdate) {
+                updatedFlowDescriptor = origFlowDescriptor;
             } else {
-                Queue<FlowGroupCache> flowGroupCacheList =
-                        Queues.synchronizedQueue(EvictingQueue.create(FLOWGROUP_CACHE_SIZE));
-                flowGroupCacheList.add(cache);
-                provider.getAllNodesFlowGroupCache().put(nodeId, flowGroupCacheList);
-            }
-
-            if (input.getFlowRef() != null) {
-                updatedFlowDescriptor =
-                        FlowDescriptorFactory.create(updated.getTableId(),
-                                                     input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
-            } else {
-                if (isUpdate) {
-                    updatedFlowDescriptor = origFlowDescriptor;
-                } else {
-                    deviceFlowRegistry.store(updatedFlowRegistryKey);
-                    updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
-                }
+                deviceFlowRegistry.store(updatedFlowRegistryKey);
+                updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
             }
 
             if (isUpdate) {