Bug 1588 - OFConstants.java moved to openflowplugin-api module
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / MessageDispatchServiceImpl.java
index 0f127e83389f17a755aea4e92d2c9c149914d3dd..6db8c7739e7c7e92690f6226d2fb19c21b749351 100644 (file)
@@ -7,13 +7,16 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core.session;
 
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.ConnectionException;
+import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
@@ -23,28 +26,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.Upd
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.*;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -52,28 +34,31 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
 
 /**
  * message dispatch service to send the message to switch.
  *
  * @author AnilGujele
- *
  */
 public class MessageDispatchServiceImpl implements IMessageDispatchService {
 
     private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
+    private static final String CONNECTION_ERROR_MESSAGE = "Session for the cookie is invalid. Reason: "
+            + "the switch has been recently disconnected OR inventory provides outdated information.";
 
-    private SessionContext session;    
+    private SessionContext session;
 
     /**
      * constructor
      *
-     * @param session
-     *            - MessageDispatchService for this session
+     * @param session - MessageDispatchService for this session
      */
     public MessageDispatchServiceImpl(SessionContext session) {
-        this.session = session;        
+        this.session = session;
     }
 
     /**
@@ -81,16 +66,14 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService {
      *
      * @param cookie to identify the right connection, it can be null also.
      * @return connectionAdapter associated with cookie, otherwise return best
-     *         suitable connection.
-     *
+     * suitable connection.
      */
 
-    private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) {
+    private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) throws ConnectionException {
 
         if (!session.isValid()) {
             LOG.warn("Session for the cookie {} is invalid.", cookie);
-            throw new IllegalArgumentException("Session for the cookie is invalid. Reason: "
-                    + "the switch has been recently disconnected OR inventory provides outdated information.");
+            throw new ConnectionException(CONNECTION_ERROR_MESSAGE);
         }
         LOG.debug("finding connecton for cookie value {}. ", cookie);
         // set main connection as default
@@ -109,145 +92,254 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService {
     }
 
     @Override
-    public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {  
-        return getConnectionAdapter(cookie).barrier(input);
+    public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
+        try {
+            return getConnectionAdapter(cookie).barrier(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
+    }
+
+    private <T> SettableFuture<RpcResult<T>> getRpcErrorFuture(ConnectionException e) {
+        List<RpcError> rpcErrorList = getConnectionErrorAsRpcErrors(e);
+        SettableFuture<RpcResult<T>> futureWithError = SettableFuture.create();
+        futureWithError.set(Rpcs.<T>getRpcResult(false, rpcErrorList));
+        return futureWithError;
+    }
+
+    private List<RpcError> getConnectionErrorAsRpcErrors(ConnectionException e) {
+        List<RpcError> rpcErrorList = new ArrayList<>();
+        rpcErrorList.add(RpcErrors.getRpcError(OFConstants.APPLICATION_TAG,
+                OFConstants.ERROR_TAG_TIMEOUT,
+                CONNECTION_ERROR_MESSAGE,
+                RpcError.ErrorSeverity.WARNING,
+                e.getMessage(),
+                RpcError.ErrorType.TRANSPORT,
+                e.getCause()));
+        return rpcErrorList;
     }
 
     @Override
     public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).experimenter(input);
+        try {
+            return getConnectionAdapter(cookie).experimenter(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowOutput>> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) {
+    public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary flowMod");
-        Future<RpcResult<Void>> response = getConnectionAdapter(cookie).flowMod(input);
-
-        // Send the same Xid back to caller - MessageDrivenSwitch
-        UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();        
-        BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ;
-        flowModOutput.setTransactionId(new TransactionId(bigIntXid));
-
-        UpdateFlowOutput result = flowModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        // solution 1: sending directly and hooking listener to get error
-        // hookup listener to catch the possible error with no reference to returned future-object
-        LOG.debug("Returning to ModelDrivenSwitch for flowMod RPC");
-        return Futures.immediateFuture(rpcResult);
+        Future<RpcResult<Void>> response = null;
+        try {
+            response = getConnectionAdapter(cookie).flowMod(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
 
+        // appending xid
+        ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response),
+                new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() {
+
+                    @Override
+                    public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
+                        UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
+                        BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                        flowModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+                        UpdateFlowOutput result = flowModOutput.build();
+                        RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
+                                inputArg.isSuccessful(), result, inputArg.getErrors());
+                        return rpcResult;
+                    }
+                });
+
+        return xidResult;
     }
 
     @Override
     public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).getAsync(input);
+        try {
+            return getConnectionAdapter(cookie).getAsync(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).getConfig(input);
+        try {
+            return getConnectionAdapter(cookie).getConfig(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).getFeatures(input);
+        try {
+            return getConnectionAdapter(cookie).getFeatures(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
-            SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).getQueueConfig(input);
+                                                                  SwitchConnectionDistinguisher cookie) {
+        try {
+            return getConnectionAdapter(cookie).getQueueConfig(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
-    public Future<RpcResult<UpdateGroupOutput>> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) {        
+    public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary groupMod");
-        Future<RpcResult<Void>> response = getConnectionAdapter(cookie).groupMod(input);
-
-        // Send the same Xid back to caller - MessageDrivenSwitch
-        UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();      
-        BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
-        groupModOutput.setTransactionId(new TransactionId(bigIntXid));
-       
-        UpdateGroupOutput result = groupModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        // solution 1: sending directly and hooking listener to get error
-        // hookup listener to catch the possible error with no reference to returned future-object
-        LOG.debug("Returning to ModelDrivenSwitch for groupMod RPC");
-        return Futures.immediateFuture(rpcResult);
+        Future<RpcResult<Void>> response = null;
+        try {
+            response = getConnectionAdapter(cookie).groupMod(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
 
+        // appending xid
+        ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response),
+                new Function<RpcResult<Void>, RpcResult<UpdateGroupOutput>>() {
+
+                    @Override
+                    public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
+                        UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
+                        BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                        groupModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+                        UpdateGroupOutput result = groupModOutput.build();
+                        RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
+                                inputArg.isSuccessful(), result, inputArg.getErrors());
+                        return rpcResult;
+                    }
+                });
+
+        return xidResult;
     }
 
     @Override
-    public Future<RpcResult<UpdateMeterOutput>> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) {
+    public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary meterMod");
-        Future<RpcResult<Void>> response = getConnectionAdapter(cookie).meterMod(input);
-
-        // Send the same Xid back to caller - MessageDrivenSwitch
-        UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();       
-        BigInteger bigIntXid =BigInteger.valueOf(input.getXid());
-        meterModOutput.setTransactionId(new TransactionId(bigIntXid));
-        
-        UpdateMeterOutput result = meterModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        // solution 1: sending directly and hooking listener to get error
-        // hookup listener to catch the possible error with no reference to returned future-object
-        LOG.debug("Returning to ModelDrivenSwitch for meterMod RPC");
-        return Futures.immediateFuture(rpcResult);
+        Future<RpcResult<Void>> response = null;
+        try {
+            response = getConnectionAdapter(cookie).meterMod(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
 
+        // appending xid
+        ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response),
+                new Function<RpcResult<Void>, RpcResult<UpdateMeterOutput>>() {
+
+                    @Override
+                    public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
+                        UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
+                        BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                        meterModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+                        UpdateMeterOutput result = meterModOutput.build();
+                        RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
+                                inputArg.isSuccessful(), result, inputArg.getErrors());
+                        return rpcResult;
+                    }
+                });
+
+        return xidResult;
     }
 
     @Override
     public Future<RpcResult<java.lang.Void>> multipartRequest(MultipartRequestInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).multipartRequest(input);
+        try {
+            return getConnectionAdapter(cookie).multipartRequest(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).packetOut(input);
+        try {
+            return getConnectionAdapter(cookie).packetOut(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
-    public Future<RpcResult<UpdatePortOutput>> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) {
-
+    public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary portMod");
-        Future<RpcResult<Void>> response = getConnectionAdapter(cookie).portMod(input);
-
-        // Send the same Xid back to caller - ModelDrivenSwitch
-        UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
-        String stringXid =input.getXid().toString();
-        BigInteger bigIntXid = new BigInteger( stringXid );
-        portModOutput.setTransactionId(new TransactionId(bigIntXid));
-        UpdatePortOutput result = portModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
+        Future<RpcResult<Void>> response = null;
+        try {
+            response = getConnectionAdapter(cookie).portMod(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
 
-        LOG.debug("Returning to ModelDrivenSwitch for portMod RPC");
-        return Futures.immediateFuture(rpcResult);
+        // appending xid
+        ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response),
+                new Function<RpcResult<Void>, RpcResult<UpdatePortOutput>>() {
+
+                    @Override
+                    public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
+                        UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
+                        BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                        portModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+                        UpdatePortOutput result = portModOutput.build();
+                        RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
+                                inputArg.isSuccessful(), result, inputArg.getErrors());
+                        return rpcResult;
+                    }
+                });
+
+        return xidResult;
     }
 
     @Override
     public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).roleRequest(input);
+        try {
+            return getConnectionAdapter(cookie).roleRequest(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).setAsync(input);
+        try {
+            return getConnectionAdapter(cookie).setAsync(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).setConfig(input);
+        try {
+            return getConnectionAdapter(cookie).setConfig(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 
     @Override
     public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
-        return getConnectionAdapter(cookie).tableMod(input);
+        try {
+            return getConnectionAdapter(cookie).tableMod(input);
+        } catch (ConnectionException e) {
+            return getRpcErrorFuture(e);
+        }
     }
 }