FlowCapableTransactionServiceImpl and SalEchoServiceImpl.java shifted to new OFJ... 89/20589/3
authorMartin Bobak <mbobak@cisco.com>
Sat, 16 May 2015 10:10:35 +0000 (12:10 +0200)
committerMartin Bobak <mbobak@cisco.com>
Sat, 16 May 2015 11:18:43 +0000 (13:18 +0200)
Change-Id: I3787c1c8ed16a2ebabdc26425ea8f4e823c72cc8
Signed-off-by: Martin Bobak <mbobak@cisco.com>
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/FlowCapableTransactionServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalEchoServiceImpl.java

index 6c369d55cff9c4e6fb90eca168787f5c288c1d39..c53dbc22ec481496bfd415f17c3cd7ac779bd2db 100644 (file)
@@ -7,20 +7,21 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
-import org.opendaylight.openflowplugin.impl.callback.SuccessCallback;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.SendBarrierInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -44,35 +45,33 @@ public class FlowCapableTransactionServiceImpl extends CommonService implements
         final DeviceContext deviceContext = getDeviceContext();
 
         final BarrierInputBuilder barrierInputOFJavaBuilder = new BarrierInputBuilder();
+        final Xid xid = requestContext.getXid();
         barrierInputOFJavaBuilder.setVersion(getVersion());
-        barrierInputOFJavaBuilder.setXid(requestContext.getXid().getValue());
+        barrierInputOFJavaBuilder.setXid(xid.getValue());
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
         deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
 
-        final BarrierInput barrierInputOFJava = barrierInputOFJavaBuilder.build();
-
-        // FIXME: should be submitted through OutboundQueue
-        final Future<RpcResult<BarrierOutput>> barrierOutputOFJava = getPrimaryConnectionAdapter()
-                .barrier(barrierInputOFJava);
-        LOG.debug("Barrier with xid {} was sent from controller.", requestContext.getXid());
-
-        ListenableFuture<RpcResult<BarrierOutput>> listenableBarrierOutputOFJava = JdkFutureAdapters
-                .listenInPoolThread(barrierOutputOFJava);
+        final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider();
+        final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+        outboundQueue.commitEntry(xid.getValue(), barrierInputOFJavaBuilder.build(), new FutureCallback<OfHeader>() {
+            @Override
+            public void onSuccess(final OfHeader ofHeader) {
+                RequestContextUtil.closeRequstContext(requestContext);
+                getDeviceContext().unhookRequestCtx(requestContext.getXid());
+                getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
 
-        // callback on OF JAVA future
-        SuccessCallback<BarrierOutput, Void> successCallback = new SuccessCallback<BarrierOutput, Void>(
-                deviceContext, requestContext, listenableBarrierOutputOFJava) {
+                settableFuture.set(RpcResultBuilder.<Void>success().build());
+            }
 
             @Override
-            public RpcResult<Void> transform(final RpcResult<BarrierOutput> rpcResult) {
-                //no transformation, because output for request context is Void
-                LOG.debug("Barrier reply with xid {} was obtained by controller.", rpcResult.getResult().getXid());
-                return RpcResultBuilder.<Void>success().build();
+            public void onFailure(final Throwable throwable) {
+                RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
+                RequestContextUtil.closeRequstContext(requestContext);
+                getDeviceContext().unhookRequestCtx(requestContext.getXid());
+                settableFuture.set(rpcResultBuilder.build());
             }
-        };
-        Futures.addCallback(listenableBarrierOutputOFJava, successCallback);
-
-        return requestContext.getFuture();
+        });
+        return settableFuture;
     }
 }
index 819e9f972f070a04f47c0c83b7781dbf91cabf5f..f29da01db34eae2df3b41b3a4632b0cff25c4240 100644 (file)
@@ -7,22 +7,23 @@
  */
 package org.opendaylight.openflowplugin.impl.services;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
-import org.opendaylight.openflowplugin.impl.callback.SuccessCallback;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SendEchoInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SendEchoOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SendEchoOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
@@ -48,7 +49,8 @@ public class SalEchoServiceImpl extends CommonService implements SalEchoService
         final DeviceContext deviceContext = getDeviceContext();
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
-        deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+        final Xid xid = requestContext.getXid();
+        deviceContext.hookRequestCtx(xid , requestContext);
 
         final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
         echoInputOFJavaBuilder.setVersion(getVersion());
@@ -56,30 +58,28 @@ public class SalEchoServiceImpl extends CommonService implements SalEchoService
         echoInputOFJavaBuilder.setData(sendEchoInput.getData());
         final EchoInput echoInputOFJava = echoInputOFJavaBuilder.build();
 
-        // FIXME: should be submitted via OutboundQueue
-        final Future<RpcResult<EchoOutput>> rpcEchoOutputOFJava = getPrimaryConnectionAdapter()
-                .echo(echoInputOFJava);
-        LOG.debug("Echo with xid {} was sent from controller", requestContext.getXid());
-
-        ListenableFuture<RpcResult<EchoOutput>> listenableRpcEchoOutputOFJava = JdkFutureAdapters
-                .listenInPoolThread(rpcEchoOutputOFJava);
-
-        // callback on OF JAVA future
-        SuccessCallback<EchoOutput, SendEchoOutput> successCallback = new SuccessCallback<EchoOutput, SendEchoOutput>(
-                deviceContext, requestContext, listenableRpcEchoOutputOFJava) {
+        LOG.debug("Echo with xid {} was sent from controller", xid);
 
+        final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider();
+        final SettableFuture<RpcResult<SendEchoOutput>> settableFuture = SettableFuture.create();
+        outboundQueue.commitEntry(xid.getValue(), echoInputOFJava, new FutureCallback<OfHeader>() {
             @Override
-            public RpcResult<SendEchoOutput> transform(final RpcResult<EchoOutput> rpcResult) {
-                EchoOutput echoOutputOFJava = rpcResult.getResult();
-                SendEchoOutputBuilder sendEchoOutputBuilder = new SendEchoOutputBuilder();
-                sendEchoOutputBuilder.setData(echoOutputOFJava.getData());
+            public void onSuccess(final OfHeader ofHeader) {
+                RequestContextUtil.closeRequstContext(requestContext);
+                getDeviceContext().unhookRequestCtx(requestContext.getXid());
+                getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
 
-                LOG.debug("Echo with xid {} was received by controller.", rpcResult.getResult().getXid());
-                return RpcResultBuilder.success(sendEchoOutputBuilder.build()).build();
+                settableFuture.set(RpcResultBuilder.<SendEchoOutput>success().build());
             }
-        };
-        Futures.addCallback(listenableRpcEchoOutputOFJava, successCallback);
 
-        return requestContext.getFuture();
+            @Override
+            public void onFailure(final Throwable throwable) {
+                RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
+                RequestContextUtil.closeRequstContext(requestContext);
+                getDeviceContext().unhookRequestCtx(requestContext.getXid());
+                settableFuture.set(rpcResultBuilder.build());
+            }
+        });
+        return settableFuture;
     }
 }