Fix RpcResultBuilder/RpcContext raw references
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / services / dedicated / StatisticsGatheringService.java
index b4bf254a7e5eabd6ddd86a9a483a4bab7e6181ed..7a5685327c93a1e3724590b2d65dd5634f591fcb 100644 (file)
@@ -9,20 +9,27 @@
 package org.opendaylight.openflowplugin.impl.statistics.services.dedicated;
 
 import com.google.common.base.Function;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
 import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
 import org.opendaylight.openflowplugin.impl.services.CommonService;
-import org.opendaylight.openflowplugin.impl.services.DataCrate;
+import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,22 +47,48 @@ public class StatisticsGatheringService extends CommonService {
 
 
     public Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(final MultipartType type) {
-        return handleServiceCall(
-                PRIMARY_CONNECTION, new Function<DataCrate<List<MultipartReply>>, ListenableFuture<RpcResult<Void>>>() {
-                    @Override
-                    public ListenableFuture<RpcResult<Void>> apply(final DataCrate<List<MultipartReply>> data) {
-                        final Xid xid = data.getRequestContext().getXid();
-                        deviceContext.hookRequestCtx(xid, data.getRequestContext());
-                        deviceContext.getOpenflowMessageListenerFacade().registerMultipartXid(xid.getValue());
-                        MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory.
-                                makeMultipartRequestInput(xid.getValue(),
-                                        version,
-                                        type);
-                        final Future<RpcResult<Void>> resultFromOFLib = deviceContext.getPrimaryConnectionContext()
-                                .getConnectionAdapter().multipartRequest(multipartRequestInput);
-                        return JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
-                    }
-                }
+        return handleServiceCall(new Function<RequestContext<List<MultipartReply>>, ListenableFuture<RpcResult<Void>>>() {
+                                     @Override
+                                     public ListenableFuture<RpcResult<Void>> apply(final RequestContext<List<MultipartReply>> requestContext) {
+                                         final Xid xid = requestContext.getXid();
+                                         final DeviceContext deviceContext = getDeviceContext();
+                                         final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector();
+
+                                         multiMsgCollector.registerMultipartXid(xid.getValue());
+                                         MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory.
+                                                 makeMultipartRequestInput(xid.getValue(),
+                                                         getVersion(),
+                                                         type);
+                                         final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
+                                         final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+                                         outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
+                                             @Override
+                                             public void onSuccess(final OfHeader ofHeader) {
+                                                 if (ofHeader instanceof MultipartReply) {
+                                                     final MultipartReply multipartReply = (MultipartReply) ofHeader;
+                                                     settableFuture.set(RpcResultBuilder.<Void>success().build());
+                                                     multiMsgCollector.addMultipartMsg(multipartReply);
+                                                 } else {
+                                                     if (null != ofHeader) {
+                                                         LOG.info("Unexpected response type received {}.", ofHeader.getClass());
+                                                     } else {
+                                                         LOG.info("Ofheader was null.");
+                                                     }
+                                                 }
+                                             }
+
+                                             @Override
+                                             public void onFailure(final Throwable throwable) {
+                                                 RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
+                                                 getDeviceContext().unhookRequestCtx(requestContext.getXid());
+                                                 RequestContextUtil.closeRequstContext(requestContext);
+
+                                                 settableFuture.set(rpcResultBuilder.build());
+                                             }
+                                         });
+                                         return settableFuture;
+                                     }
+                                 }
 
         );
     }