Bump MRI upstreams
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / sal / SalFlowServiceImpl.java
index 6d9e09b8bf9111f12b142df030f5977f222ef881..6542234329a4d3fb7d08d9221c4d503b7c0e7a80 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,16 +7,15 @@
  */
 package org.opendaylight.openflowplugin.impl.services.sal;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.FlowGroupStatus;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
@@ -29,11 +28,8 @@ import org.opendaylight.openflowplugin.impl.services.singlelayer.SingleLayerFlow
 import org.opendaylight.openflowplugin.impl.util.ErrorUtil;
 import org.opendaylight.openflowplugin.impl.util.FlowCreatorUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
@@ -45,18 +41,20 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.Upda
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SalFlowServiceImpl implements SalFlowService {
     private static final Logger LOG = LoggerFactory.getLogger(SalFlowServiceImpl.class);
+    private static final Uint8 OFPTT_ALL = Uint8.MAX_VALUE;
+
     private final MultiLayerFlowService<UpdateFlowOutput> flowUpdate;
     private final MultiLayerFlowService<AddFlowOutput> flowAdd;
     private final MultiLayerFlowService<RemoveFlowOutput> flowRemove;
@@ -87,43 +85,42 @@ public class SalFlowServiceImpl implements SalFlowService {
     }
 
     @Override
-    public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
+    public ListenableFuture<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
         final FlowRegistryKey flowRegistryKey =
                 FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
         final ListenableFuture<RpcResult<AddFlowOutput>> future;
-
         if (flowAddMessage.canUseSingleLayerSerialization()) {
             future = flowAddMessage.handleServiceCall(input);
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey),
+                    MoreExecutors.directExecutor());
         } else {
             future = flowAdd.processFlowModInputBuilders(flowAdd.toFlowModInputs(input));
-            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey));
+            Futures.addCallback(future, new AddFlowCallback(input, flowRegistryKey),
+                    MoreExecutors.directExecutor());
 
         }
         return future;
     }
 
     @Override
-    public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
+    public ListenableFuture<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
         final ListenableFuture<RpcResult<RemoveFlowOutput>> future;
-
         if (flowRemoveMessage.canUseSingleLayerSerialization()) {
             future = flowRemoveMessage.handleServiceCall(input);
-            Futures.addCallback(future, new RemoveFlowCallback(input));
+            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
 
         } else {
             future = flowRemove.processFlowModInputBuilders(flowRemove.toFlowModInputs(input));
-            Futures.addCallback(future, new RemoveFlowCallback(input));
+            Futures.addCallback(future, new RemoveFlowCallback(input), MoreExecutors.directExecutor());
         }
 
         return future;
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
+    public ListenableFuture<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
         final UpdatedFlow updated = input.getUpdatedFlow();
         final OriginalFlow original = input.getOriginalFlow();
-
         final List<FlowModInputBuilder> allFlowMods = new ArrayList<>();
         final List<FlowModInputBuilder> ofFlowModInputs;
 
@@ -140,11 +137,11 @@ public class SalFlowServiceImpl implements SalFlowService {
                 Futures.addCallback(listListenableFuture, new FutureCallback<List<RpcResult<UpdateFlowOutput>>>() {
                     @Override
                     public void onSuccess(final List<RpcResult<UpdateFlowOutput>> results) {
-                        final ArrayList<RpcError> errors = new ArrayList();
+                        final ArrayList<RpcError> errors = new ArrayList<>();
                         for (RpcResult<UpdateFlowOutput> flowModResult : results) {
                             if (flowModResult == null) {
                                 errors.add(RpcResultBuilder.newError(
-                                        RpcError.ErrorType.PROTOCOL, OFConstants.APPLICATION_TAG,
+                                        ErrorType.PROTOCOL, OFConstants.APPLICATION_TAG,
                                         "unexpected flowMod result (null) occurred"));
                             } else if (!flowModResult.isSuccessful()) {
                                 errors.addAll(flowModResult.getErrors());
@@ -166,7 +163,7 @@ public class SalFlowServiceImpl implements SalFlowService {
                         RpcResultBuilder<UpdateFlowOutput> rpcResultBuilder = RpcResultBuilder.failed();
                         objectSettableFuture.set(rpcResultBuilder.build());
                     }
-                });
+                }, MoreExecutors.directExecutor());
 
                 future = objectSettableFuture;
             } else {
@@ -192,61 +189,58 @@ public class SalFlowServiceImpl implements SalFlowService {
             future = flowUpdate.processFlowModInputBuilders(allFlowMods);
         }
 
-        Futures.addCallback(future, new UpdateFlowCallback(input));
+        Futures.addCallback(future, new UpdateFlowCallback(input), MoreExecutors.directExecutor());
         return future;
     }
 
-    @VisibleForTesting
-    private static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(
-                                                                    FlowDescriptor flowDescriptor,
-                                                                    KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
-        return nodePath.augmentation(FlowCapableNode.class)
-                .child(Table.class, flowDescriptor.getTableKey())
-                .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
-    }
-
-    private class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
+    private final class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
         private final AddFlowInput input;
         private final FlowRegistryKey flowRegistryKey;
 
-        private AddFlowCallback(final AddFlowInput input,
-                                final FlowRegistryKey flowRegistryKey) {
+        private AddFlowCallback(final AddFlowInput input, final FlowRegistryKey flowRegistryKey) {
             this.input = input;
             this.flowRegistryKey = flowRegistryKey;
         }
 
         @Override
         public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
-            if (rpcResult.isSuccessful()) {
-                final FlowDescriptor flowDescriptor;
-
-                if (Objects.nonNull(input.getFlowRef())) {
-                    final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
-                    flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
-                } else {
-                    deviceContext.getDeviceFlowRegistry().store(flowRegistryKey);
-                    flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
-                }
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
-                }
-            } else {
+            if (!rpcResult.isSuccessful()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow add failed for flow={}, errors={}", input,
                             ErrorUtil.errorsToString(rpcResult.getErrors()));
                 }
+                return;
+            }
+
+            final DeviceFlowRegistry flowRegistry = deviceContext.getDeviceFlowRegistry();
+            final FlowDescriptor flowDescriptor;
+            final FlowRef flowRef = input.getFlowRef();
+            if (flowRef != null) {
+                final Uint8 tableId = input.getTableId();
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                flowDescriptor = FlowDescriptorFactory.create(tableId, flowId);
+
+                // FIXME: this looks like an atomic operation
+                flowRegistry.appendHistoryFlow(flowId, tableId, FlowGroupStatus.ADDED);
+                flowRegistry.storeDescriptor(flowRegistryKey, flowDescriptor);
+            } else {
+                // FIXME: this looks like an atomic operation
+                flowRegistry.store(flowRegistryKey);
+                flowDescriptor = flowRegistry.retrieveDescriptor(flowRegistryKey);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
             }
         }
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for adding flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for adding flow={} failed", input, throwable);
         }
     }
 
-    private class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
+    private final class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
         private final RemoveFlowInput input;
 
         private RemoveFlowCallback(final RemoveFlowInput input) {
@@ -259,9 +253,20 @@ public class SalFlowServiceImpl implements SalFlowService {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove finished without error for flow={}", input);
                 }
-                FlowRegistryKey flowRegistryKey =
-                        FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
-                deviceContext.getDeviceFlowRegistry().addMark(flowRegistryKey);
+                final DeviceFlowRegistry flowRegistry = deviceContext.getDeviceFlowRegistry();
+                if (input.getTableId() != null && !input.getTableId().equals(OFPTT_ALL)) {
+                    FlowRegistryKey flowRegistryKey =
+                            FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), input);
+                    flowRegistry.addMark(flowRegistryKey);
+
+                    final FlowRef flowRef = input.getFlowRef();
+                    if (flowRef != null) {
+                        final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                        flowRegistry.appendHistoryFlow(flowId, input.getTableId(), FlowGroupStatus.REMOVED);
+                    }
+                } else {
+                    flowRegistry.clearFlowRegistry();
+                }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Flow remove failed for flow={}, errors={}", input,
@@ -272,21 +277,20 @@ public class SalFlowServiceImpl implements SalFlowService {
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for removing flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for removing flow={} failed", input, throwable);
         }
     }
 
-    private class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
+    private final class UpdateFlowCallback implements FutureCallback<RpcResult<UpdateFlowOutput>> {
         private final UpdateFlowInput input;
 
-        private UpdateFlowCallback(UpdateFlowInput input) {
+        private UpdateFlowCallback(final UpdateFlowInput input) {
             this.input = input;
         }
 
         @Override
         public void onSuccess(final RpcResult<UpdateFlowOutput> updateFlowOutputRpcResult) {
             final DeviceFlowRegistry deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
-
             final UpdatedFlow updated = input.getUpdatedFlow();
             final OriginalFlow original = input.getOriginalFlow();
             final FlowRegistryKey origFlowRegistryKey =
@@ -295,20 +299,21 @@ public class SalFlowServiceImpl implements SalFlowService {
                     FlowRegistryKeyFactory.create(deviceContext.getDeviceInfo().getVersion(), updated);
             final FlowDescriptor origFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(origFlowRegistryKey);
 
-            final boolean isUpdate = Objects.nonNull(origFlowDescriptor);
+            final boolean isUpdate = origFlowDescriptor != null;
             final FlowDescriptor updatedFlowDescriptor;
-
-            if (Objects.nonNull(input.getFlowRef())) {
-                updatedFlowDescriptor =
-                        FlowDescriptorFactory.create(updated.getTableId(),
-                                                     input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
+            final FlowRef flowRef = input.getFlowRef();
+            if (flowRef != null) {
+                final Uint8 tableId = updated.getTableId();
+                final FlowId flowId = flowRef.getValue().firstKeyOf(Flow.class).getId();
+                // FIXME: this does not look right, we probably want better integration
+                deviceFlowRegistry.appendHistoryFlow(flowId, tableId, FlowGroupStatus.MODIFIED);
+
+                updatedFlowDescriptor = FlowDescriptorFactory.create(tableId, flowId);
+            } else if (isUpdate) {
+                updatedFlowDescriptor = origFlowDescriptor;
             } else {
-                if (isUpdate) {
-                    updatedFlowDescriptor = origFlowDescriptor;
-                } else {
-                    deviceFlowRegistry.store(updatedFlowRegistryKey);
-                    updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
-                }
+                deviceFlowRegistry.store(updatedFlowRegistryKey);
+                updatedFlowDescriptor = deviceFlowRegistry.retrieveDescriptor(updatedFlowRegistryKey);
             }
 
             if (isUpdate) {
@@ -319,7 +324,7 @@ public class SalFlowServiceImpl implements SalFlowService {
 
         @Override
         public void onFailure(final Throwable throwable) {
-            LOG.warn("Service call for updating flow={} failed, reason: {}", input, throwable);
+            LOG.warn("Service call for updating flow={} failed", input, throwable);
         }
     }
 }