X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fstatistics%2Fservices%2Fdedicated%2FStatisticsGatheringService.java;h=7a5685327c93a1e3724590b2d65dd5634f591fcb;hb=69c8d4536858ef685a2fe93763475b182380bbb6;hp=b4bf254a7e5eabd6ddd86a9a483a4bab7e6181ed;hpb=120a9595b96e710078cbf6bf739d9187d186245e;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java index b4bf254a7e..7a5685327c 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java @@ -9,20 +9,27 @@ package org.opendaylight.openflowplugin.impl.statistics.services.dedicated; import com.google.common.base.Function; -import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.util.List; import java.util.concurrent.Future; +import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack; import org.opendaylight.openflowplugin.api.openflow.device.Xid; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector; import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory; import org.opendaylight.openflowplugin.impl.services.CommonService; -import org.opendaylight.openflowplugin.impl.services.DataCrate; +import org.opendaylight.openflowplugin.impl.services.RequestContextUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,22 +47,48 @@ public class StatisticsGatheringService extends CommonService { public Future>> getStatisticsOfType(final MultipartType type) { - return handleServiceCall( - PRIMARY_CONNECTION, new Function>, ListenableFuture>>() { - @Override - public ListenableFuture> apply(final DataCrate> data) { - final Xid xid = data.getRequestContext().getXid(); - deviceContext.hookRequestCtx(xid, data.getRequestContext()); - deviceContext.getOpenflowMessageListenerFacade().registerMultipartXid(xid.getValue()); - MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory. - makeMultipartRequestInput(xid.getValue(), - version, - type); - final Future> resultFromOFLib = deviceContext.getPrimaryConnectionContext() - .getConnectionAdapter().multipartRequest(multipartRequestInput); - return JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - } - } + return handleServiceCall(new Function>, ListenableFuture>>() { + @Override + public ListenableFuture> apply(final RequestContext> requestContext) { + final Xid xid = requestContext.getXid(); + final DeviceContext deviceContext = getDeviceContext(); + final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector(); + + multiMsgCollector.registerMultipartXid(xid.getValue()); + MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory. + makeMultipartRequestInput(xid.getValue(), + getVersion(), + type); + final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider(); + final SettableFuture> settableFuture = SettableFuture.create(); + outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback() { + @Override + public void onSuccess(final OfHeader ofHeader) { + if (ofHeader instanceof MultipartReply) { + final MultipartReply multipartReply = (MultipartReply) ofHeader; + settableFuture.set(RpcResultBuilder.success().build()); + multiMsgCollector.addMultipartMsg(multipartReply); + } else { + if (null != ofHeader) { + LOG.info("Unexpected response type received {}.", ofHeader.getClass()); + } else { + LOG.info("Ofheader was null."); + } + } + } + + @Override + public void onFailure(final Throwable throwable) { + RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage()); + getDeviceContext().unhookRequestCtx(requestContext.getXid()); + RequestContextUtil.closeRequstContext(requestContext); + + settableFuture.set(rpcResultBuilder.build()); + } + }); + return settableFuture; + } + } ); }