package org.opendaylight.openflowplugin.impl.statistics.services.dedicated;
import com.google.common.base.Function;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
import org.opendaylight.openflowplugin.impl.services.CommonService;
-import org.opendaylight.openflowplugin.impl.services.DataCrate;
+import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(final MultipartType type) {
- return handleServiceCall(
- PRIMARY_CONNECTION, new Function<DataCrate<List<MultipartReply>>, ListenableFuture<RpcResult<Void>>>() {
- @Override
- public ListenableFuture<RpcResult<Void>> apply(final DataCrate<List<MultipartReply>> data) {
+ return handleServiceCall(new Function<RequestContext<List<MultipartReply>>, ListenableFuture<RpcResult<Void>>>() {
+ @Override
+ public ListenableFuture<RpcResult<Void>> apply(final RequestContext<List<MultipartReply>> requestContext) {
+ final Xid xid = requestContext.getXid();
+ final DeviceContext deviceContext = getDeviceContext();
+ final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector();
- LOG.info("Calling multipart request for type {}", type);
- final Xid xid = deviceContext.getNextXid();
- data.getRequestContext().setXid(xid);
- deviceContext.getAnyMessageTypeListener().registerMultipartXid(xid.getValue());
- MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory.
- makeMultipartRequestInput(xid.getValue(),
- version,
- type);
- final Future<RpcResult<Void>> resultFromOFLib = deviceContext.getPrimaryConnectionContext()
- .getConnectionAdapter().multipartRequest(multipartRequestInput);
- return JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- }
- }
+ multiMsgCollector.registerMultipartXid(xid.getValue());
+ MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory.
+ makeMultipartRequestInput(xid.getValue(),
+ getVersion(),
+ type);
+ final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider();
+ final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+ outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
+ @Override
+ public void onSuccess(final OfHeader ofHeader) {
+ if (ofHeader instanceof MultipartReply) {
+ final MultipartReply multipartReply = (MultipartReply) ofHeader;
+ settableFuture.set(RpcResultBuilder.<Void>success().build());
+ multiMsgCollector.addMultipartMsg(multipartReply);
+ } else {
+ if (null != ofHeader) {
+ LOG.info("Unexpected response type received {}.", ofHeader.getClass());
+ } else {
+ LOG.info("Ofheader was null.");
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
+ getDeviceContext().unhookRequestCtx(requestContext.getXid());
+ RequestContextUtil.closeRequstContext(requestContext);
+
+ settableFuture.set(rpcResultBuilder.build());
+ }
+ });
+ return settableFuture;
+ }
+ }
);
}