Fixed aggregated statistics
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / services / OpendaylightFlowStatisticsServiceImpl.java
index f6b57059a39f775eca4237c65be06902d1b1ddc1..751da7ebfd3dba79784405fea59daa4b106bf069 100644 (file)
@@ -8,24 +8,37 @@
 package org.opendaylight.openflowplugin.impl.statistics.services;
 
 import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.MessageTranslator;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
 import org.opendaylight.openflowplugin.impl.services.CommonService;
 import org.opendaylight.openflowplugin.impl.services.DataCrate;
 import org.opendaylight.openflowplugin.impl.services.OFJResult2RequestCtxFuture;
 import org.opendaylight.openflowplugin.impl.services.RequestInputUtils;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.MatchReactor;
 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.AggregateFlowStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAllFlowsStatisticsFromAllFlowTablesInput;
@@ -33,16 +46,28 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.G
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetFlowStatisticsFromFlowTableOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.get.aggregate.flow.statistics.from.flow.table._for.given.match.output.AggregatedFlowStatistics;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyAggregateCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestAggregateCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.MultipartRequestFlowCaseBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.aggregate._case.MultipartRequestAggregateBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.flow._case.MultipartRequestFlowBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /**
  * @author joe
  */
@@ -100,46 +125,95 @@ public class OpendaylightFlowStatisticsServiceImpl extends CommonService impleme
             final GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput input) {
 
 
-        return this.<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput, Void>handleServiceCall(
-                PRIMARY_CONNECTION,
-                new Function<DataCrate<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>, ListenableFuture<RpcResult<Void>>>() {
-
-                    @Override
-                    public ListenableFuture<RpcResult<Void>> apply(final DataCrate<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput> data) {
-                        final MultipartRequestAggregateCaseBuilder multipartRequestAggregateCaseBuilder = new MultipartRequestAggregateCaseBuilder();
-                        final MultipartRequestAggregateBuilder mprAggregateRequestBuilder = new MultipartRequestAggregateBuilder();
-                        mprAggregateRequestBuilder.setTableId(input.getTableId());
-                        mprAggregateRequestBuilder.setOutPort(input.getOutPort().longValue());
-                        // TODO: repeating code
-                        if (version == OFConstants.OFP_VERSION_1_3) {
-                            mprAggregateRequestBuilder.setCookie(input.getCookie().getValue());
-                            mprAggregateRequestBuilder.setCookieMask(input.getCookieMask().getValue());
-                            mprAggregateRequestBuilder.setOutGroup(input.getOutGroup());
-                        } else {
-                            mprAggregateRequestBuilder.setOutGroup(OFConstants.OFPG_ANY);
+        ListenableFuture<RpcResult<List<MultipartReply>>> rpcResultListenableFuture = handleServiceCall(
+            PRIMARY_CONNECTION,
+            new Function<DataCrate<List<MultipartReply>>, ListenableFuture<RpcResult<Void>>>() {
+
+                @Override
+                public ListenableFuture<RpcResult<Void>> apply(final DataCrate<List<MultipartReply>> data) {
+                    final Xid xid = data.getRequestContext().getXid();
+                    deviceContext.getOpenflowMessageListenerFacade().registerMultipartXid(xid.getValue());
+                    final MultipartRequestAggregateCaseBuilder multipartRequestAggregateCaseBuilder = new MultipartRequestAggregateCaseBuilder();
+                    final MultipartRequestAggregateBuilder mprAggregateRequestBuilder = new MultipartRequestAggregateBuilder();
+                    final short tableId = MoreObjects.firstNonNull(input.getTableId(), OFConstants.OFPTT_ALL).shortValue();
+                    mprAggregateRequestBuilder.setTableId(tableId);
+                    long outputPortValue = MoreObjects.firstNonNull(input.getOutPort(), OFConstants.OFPP_ANY).longValue();
+                    mprAggregateRequestBuilder.setOutPort(outputPortValue);
+                    // TODO: repeating code
+                    if (version == OFConstants.OFP_VERSION_1_3) {
+
+                        if (input.getCookie() == null) {
                             mprAggregateRequestBuilder.setCookie(OFConstants.DEFAULT_COOKIE);
-                            mprAggregateRequestBuilder.setCookieMask(OFConstants.DEFAULT_COOKIE_MASK);
+                        } else {
+                            mprAggregateRequestBuilder.setCookie(MoreObjects.firstNonNull(input.getCookie().getValue(), OFConstants.DEFAULT_COOKIE));
                         }
 
-                        MatchReactor.getInstance().convert(input.getMatch(), version, mprAggregateRequestBuilder,
-                                deviceContext.getPrimaryConnectionContext().getFeatures().getDatapathId());
-
-                        FlowCreatorUtil.setWildcardedFlowMatch(version, mprAggregateRequestBuilder);
-
-                        // Set request body to main multipart request
-                        multipartRequestAggregateCaseBuilder.setMultipartRequestAggregate(mprAggregateRequestBuilder
-                                .build());
-
-                        final Xid xid = data.getRequestContext().getXid();
-                        final MultipartRequestInputBuilder mprInput = RequestInputUtils.createMultipartHeader(
-                                MultipartType.OFPMPAGGREGATE, xid.getValue(), version);
+                        if (input.getCookieMask() == null) {
+                            mprAggregateRequestBuilder.setCookieMask(OFConstants.DEFAULT_COOKIE_MASK);
+                        } else {
+                            mprAggregateRequestBuilder.setCookieMask(MoreObjects.firstNonNull(input.getCookieMask().getValue(), OFConstants.DEFAULT_COOKIE_MASK));
+                        }
+                        long outGroup = MoreObjects.firstNonNull(input.getOutGroup(), OFConstants.OFPG_ANY).longValue();
+                        mprAggregateRequestBuilder.setOutGroup(outGroup);
+                    } else {
+                        mprAggregateRequestBuilder.setOutGroup(OFConstants.OFPG_ANY);
+                        mprAggregateRequestBuilder.setCookie(OFConstants.DEFAULT_COOKIE);
+                        mprAggregateRequestBuilder.setCookieMask(OFConstants.DEFAULT_COOKIE_MASK);
+                    }
 
-                        mprInput.setMultipartRequestBody(multipartRequestAggregateCaseBuilder.build());
-                        final Future<RpcResult<Void>> resultFromOFLib = deviceContext.getPrimaryConnectionContext()
-                                .getConnectionAdapter().multipartRequest(mprInput.build());
-                        return JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    MatchReactor.getInstance().convert(input.getMatch(), version, mprAggregateRequestBuilder,
+                            deviceContext.getPrimaryConnectionContext().getFeatures().getDatapathId());
+
+                    FlowCreatorUtil.setWildcardedFlowMatch(version, mprAggregateRequestBuilder);
+
+                    // Set request body to main multipart request
+                    multipartRequestAggregateCaseBuilder.setMultipartRequestAggregate(mprAggregateRequestBuilder
+                            .build());
+
+                    final MultipartRequestInputBuilder mprInput = RequestInputUtils.createMultipartHeader(
+                            MultipartType.OFPMPAGGREGATE, xid.getValue(), version);
+
+                    mprInput.setMultipartRequestBody(multipartRequestAggregateCaseBuilder.build());
+                    final Future<RpcResult<Void>> resultFromOFLib = deviceContext.getPrimaryConnectionContext()
+                            .getConnectionAdapter().multipartRequest(mprInput.build());
+                    return JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                }
+            });
+
+        return Futures.transform(rpcResultListenableFuture, new Function<RpcResult<List<MultipartReply>>, RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>>() {
+            @Nullable
+            @Override
+            public RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput> apply(RpcResult<List<MultipartReply>> input) {
+                TranslatorLibrary translatorLibrary = deviceContext.oook();
+                RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput> rpcResult;
+                if(input.isSuccessful()) {
+                    MultipartReply reply = input.getResult().get(0);
+                    final TranslatorKey translatorKey = new TranslatorKey(reply.getVersion(), MultipartReplyAggregateCase.class.getName());
+                    final MessageTranslator<MultipartReply, AggregatedFlowStatistics> messageTranslator = translatorLibrary.lookupTranslator(translatorKey);
+                    List<AggregatedFlowStatistics> aggregStats = new ArrayList<AggregatedFlowStatistics>();
+
+                    for (MultipartReply multipartReply : input.getResult()) {
+                        aggregStats.add(messageTranslator.translate(multipartReply, deviceContext, null));
                     }
-                });
+
+                    GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder getAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder =
+                            new GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder();
+                    getAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder.setAggregatedFlowStatistics(aggregStats);
+
+                    rpcResult = RpcResultBuilder
+                            .<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>success()
+                            .withResult(getAggregateFlowStatisticsFromFlowTableForGivenMatchOutputBuilder.build())
+                            .build();
+
+                } else {
+                    rpcResult = RpcResultBuilder
+                            .<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>failed()
+                            .withRpcErrors(input.getErrors())
+                            .build();
+                }
+                return rpcResult;
+            }
+        });
 
     }