Fix RpcResultBuilder/RpcContext raw references
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / services / SalTableServiceImpl.java
index f90e650e8b965c03676740141572d6ef8a1a2c53..fb84225fefca26fdae13e22c6eafb6ffc80b1001 100644 (file)
@@ -7,17 +7,42 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
+import com.google.common.base.Function;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.openflowplugin.api.OFConstants;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.Xid;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.TableFeaturesConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.TableFeaturesReplyConvertor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyTableFeaturesCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.table.features._case.MultipartReplyTableFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestTableFeaturesCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.table.features._case.MultipartRequestTableFeaturesBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.table.features._case.multipart.request.table.features.TableFeatures;
@@ -25,31 +50,34 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.Sal
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.UpdateTableOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeaturesKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
-import java.math.BigInteger;
-import java.util.List;
-import java.util.concurrent.Future;
 
-/**
- * @author joe
- */
 public class SalTableServiceImpl extends CommonService implements SalTableService {
 
     private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SalTableServiceImpl.class);
 
+    public SalTableServiceImpl(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
+        super(requestContextStack, deviceContext);
+    }
+
     @Override
     public Future<RpcResult<UpdateTableOutput>> updateTable(final UpdateTableInput input) {
-        class FunctionImpl implements Function<UpdateTableOutput> {
+        class FunctionImpl implements
+                Function<RequestContext<List<MultipartReply>>, ListenableFuture<RpcResult<Void>>> {
 
             @Override
-            public Future<RpcResult<UpdateTableOutput>> apply(final BigInteger IDConnection) {
-
-                final SettableFuture<RpcResult<UpdateTableOutput>> result = SettableFuture.create();
+            public ListenableFuture<RpcResult<Void>> apply(final RequestContext<List<MultipartReply>> requestContext) {
+                getMessageSpy().spyMessage(input.getImplementedInterface(),
+                        MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
 
-                final long xid = deviceContext.getNextXid().getValue();
+                final SettableFuture<RpcResult<List<MultipartReply>>> result = SettableFuture.create();
 
                 final MultipartRequestTableFeaturesCaseBuilder caseBuilder = new MultipartRequestTableFeaturesCaseBuilder();
                 final MultipartRequestTableFeaturesBuilder requestBuilder = new MultipartRequestTableFeaturesBuilder();
@@ -59,66 +87,144 @@ public class SalTableServiceImpl extends CommonService implements SalTableServic
                 caseBuilder.setMultipartRequestTableFeatures(requestBuilder.build());
 
                 // Set request body to main multipart request
+                final Xid xid = requestContext.getXid();
+                getDeviceContext().getMultiMsgCollector().registerMultipartXid(xid.getValue());
                 final MultipartRequestInputBuilder mprInput = createMultipartHeader(MultipartType.OFPMPTABLEFEATURES,
-                        xid);
+                        xid.getValue());
                 mprInput.setMultipartRequestBody(caseBuilder.build());
+                final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider();
 
-                final Future<RpcResult<Void>> resultFromOFLib = provideConnectionAdapter(PRIMARY_CONNECTION)
-                        .multipartRequest(mprInput.build());
-                final ListenableFuture<RpcResult<Void>> resultLib = JdkFutureAdapters
-                        .listenInPoolThread(resultFromOFLib);
+                final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+                final MultiMsgCollector multiMsgCollector = getDeviceContext().getMultiMsgCollector();
+                multiMsgCollector.registerMultipartXid(xid.getValue());
 
-                Futures.addCallback(resultLib, new ResultCallback<UpdateTableOutput>(result) {
+                final MultipartRequestInput multipartRequestInput = mprInput.build();
+                outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
                     @Override
-                    public UpdateTableOutput createResult() {
-                        final UpdateTableOutputBuilder queueStatsFromPortBuilder = new UpdateTableOutputBuilder()
-                                .setTransactionId(new TransactionId(BigInteger.valueOf(xid)));
-                        return queueStatsFromPortBuilder.build();
+                    public void onSuccess(final OfHeader ofHeader) {
+                        if (ofHeader instanceof MultipartReply) {
+                            MultipartReply multipartReply = (MultipartReply) ofHeader;
+                            multiMsgCollector.addMultipartMsg(multipartReply);
+                        } else {
+                            if (null != ofHeader) {
+                                LOG.info("Unexpected response type received {}.", ofHeader.getClass());
+                            } else {
+                                LOG.info("Response received is null.");
+                            }
+                        }
+                        getMessageSpy().spyMessage(multipartRequestInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
+                        settableFuture.set(RpcResultBuilder.<Void>success().build());
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable throwable) {
+                        RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
+                        RequestContextUtil.closeRequstContext(requestContext);
+                        getDeviceContext().unhookRequestCtx(requestContext.getXid());
+                        getMessageSpy().spyMessage(multipartRequestInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+                        settableFuture.set(rpcResultBuilder.build());
                     }
                 });
+                return settableFuture;
+            }
+        }
 
-                return result;
+        final ListenableFuture<RpcResult<List<MultipartReply>>> multipartFuture = handleServiceCall(new FunctionImpl());
+        final SettableFuture<RpcResult<UpdateTableOutput>> finalFuture = SettableFuture.create();
+
+        class CallBackImpl implements FutureCallback<RpcResult<List<MultipartReply>>> {
+            private final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(CallBackImpl.class);
+
+            @Override
+            public void onSuccess(final RpcResult<List<MultipartReply>> result) {
+
+                if (result.isSuccessful()) {
+                    final List<MultipartReply> multipartReplies = result.getResult();
+                    if (multipartReplies.isEmpty()) {
+                        LOGGER.debug("Multipart reply to table features request shouldn't be empty list.");
+                        finalFuture.set(RpcResultBuilder.<UpdateTableOutput>failed()
+                                .withError(ErrorType.RPC, "Multipart reply list is empty.").build());
+                    } else {
+                        final Long xid = multipartReplies.get(0).getXid();
+                        LOGGER.debug(
+                                "OnSuccess, rpc result successful, multipart response for rpc update-table with xid {} obtained.",
+                                xid);
+                        final UpdateTableOutputBuilder updateTableOutputBuilder = new UpdateTableOutputBuilder();
+                        updateTableOutputBuilder.setTransactionId(new TransactionId(BigInteger.valueOf(xid)));
+                        finalFuture.set(RpcResultBuilder.success(updateTableOutputBuilder.build()).build());
+                        writeResponseToOperationalDatastore(multipartReplies);
+                    }
+                } else {
+                    LOGGER.debug("OnSuccess, rpc result unsuccessful, multipart response for rpc update-table was unsuccessful.");
+                    finalFuture.set(RpcResultBuilder.<UpdateTableOutput>failed().withRpcErrors(result.getErrors())
+                            .build());
+                }
             }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOGGER.debug("Failure multipart response for table features request. Exception: {}", t);
+                finalFuture.set(RpcResultBuilder.<UpdateTableOutput>failed()
+                        .withError(ErrorType.RPC, "Future error", t).build());
+            }
+
+            /**
+             * @param multipartReplies
+             */
+            private void writeResponseToOperationalDatastore(final List<MultipartReply> multipartReplies) {
+
+                final List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> salTableFeatures = convertToSalTableFeatures(multipartReplies);
+
+                final DeviceContext deviceContext = getDeviceContext();
+                final NodeId nodeId = deviceContext.getPrimaryConnectionContext().getNodeId();
+                final InstanceIdentifier<FlowCapableNode> flowCapableNodeII = InstanceIdentifier.create(Nodes.class)
+                        .child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
+                for (org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures tableFeatureData : salTableFeatures) {
+                    final Short tableId = tableFeatureData.getTableId();
+                    KeyedInstanceIdentifier<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures, TableFeaturesKey> tableFeaturesII = flowCapableNodeII
+                            .child(Table.class, new TableKey(tableId))
+                            .child(org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures.class,
+                                    new TableFeaturesKey(tableId));
+                    deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, tableFeaturesII,
+                            tableFeatureData);
+                }
+
+            }
+
+            private List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> convertToSalTableFeatures(
+                    final List<MultipartReply> multipartReplies) {
+                final List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> salTableFeaturesAll = new ArrayList<>();
+                for (MultipartReply multipartReply : multipartReplies) {
+                    if (multipartReply.getType().equals(MultipartType.OFPMPTABLEFEATURES)) {
+                        MultipartReplyBody multipartReplyBody = multipartReply.getMultipartReplyBody();
+                        if (multipartReplyBody instanceof MultipartReplyTableFeaturesCase) {
+                            MultipartReplyTableFeaturesCase tableFeaturesCase = ((MultipartReplyTableFeaturesCase) multipartReplyBody);
+                            MultipartReplyTableFeatures salTableFeatures = tableFeaturesCase
+                                    .getMultipartReplyTableFeatures();
+                            List<org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures> salTableFeaturesPartial = TableFeaturesReplyConvertor
+                                    .toTableFeaturesReply(salTableFeatures);
+                            salTableFeaturesAll.addAll(salTableFeaturesPartial);
+                            LOGGER.debug("TableFeature {} for xid {}.", salTableFeatures, multipartReply.getXid());
+                        }
+                    }
+                }
+                return salTableFeaturesAll;
+            }
+
         }
 
-        return ServiceCallProcessingUtil.<UpdateTableOutput>handleServiceCall(rpcContext, PRIMARY_CONNECTION,
-                provideWaitTime(), new FunctionImpl());
+        Futures.addCallback(multipartFuture, new CallBackImpl());
+
+        return finalFuture;
     }
 
     private MultipartRequestInputBuilder createMultipartHeader(final MultipartType multipart, final Long xid) {
         final MultipartRequestInputBuilder mprInput = new MultipartRequestInputBuilder();
         mprInput.setType(multipart);
-        mprInput.setVersion(version);
+        mprInput.setVersion(getVersion());
         mprInput.setXid(xid);
         mprInput.setFlags(new MultipartRequestFlags(false));
         return mprInput;
     }
 
-    private abstract static class ResultCallback<T> implements FutureCallback<RpcResult<Void>> {
-
-        private final SettableFuture<RpcResult<T>> result;
-
-        /**
-         * @param result
-         */
-        public ResultCallback(final SettableFuture<RpcResult<T>> result) {
-            this.result = result;
-        }
-
-        public abstract T createResult();
-
-        @Override
-        public void onSuccess(final RpcResult<Void> resultArg) {
-            result.set(RpcResultBuilder.success(createResult()).build());
-        }
-
-        @Override
-        public void onFailure(final Throwable t) {
-            result.set(RpcResultBuilder
-                    .<T>failed()
-                    .withWarning(ErrorType.RPC, OFConstants.ERROR_TAG_TIMEOUT, "something wrong happened",
-                            OFConstants.APPLICATION_TAG, "", t).build());
-        }
-    }
-
 }