BUG-956 deadlock by rpc invocation - phase2 17/7117/1
authorMichal Rehak <mirehak@cisco.com>
Thu, 8 May 2014 20:09:43 +0000 (22:09 +0200)
committerMichal Rehak <mirehak@cisco.com>
Fri, 16 May 2014 15:50:15 +0000 (17:50 +0200)
- tuning Future chaining
- introduced ListeningExecutorService
- dereference used
- clean up after bulkTransactionCache removal
- add/update meter and group refactored in order to follow future responses and run in pool

Change-Id: I5c9367512c3ab6f1799f30dad1344667fde63991
Signed-off-by: Michal Rehak <mirehak@cisco.com>
21 files changed:
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/OFConstants.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java [deleted file]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java [deleted file]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/FlowCreatorUtil.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/InventoryDataServiceUtil.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PortTranslatorUtil.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SwitchFeaturesUtilTest.java

index f125390447563577557ddaa4e06a2da11bd9a27b..bbc2786e5a4f010814e1bdaeaed19ea546485e72 100644 (file)
@@ -39,4 +39,9 @@ public class OFConstants {
     public static final int MAC_ADDRESS_LENGTH = 6;
     public static final int SIZE_OF_LONG_IN_BYTES = 8;
     public static final int SIGNUM_UNSIGNED = 1;
+    
+    /** RpcError application tag */
+    public static final String APPLICATION_TAG = "OPENFLOW_PLUGIN";
+    /** RpcError tag - timeout */
+    public static final String ERROR_TAG_TIMEOUT = "TIMOUT";
 }
index 5e0a6a923bfcffd117a4240b69337197d2b0996f..147bb60ed6fa330edf43288abbe23811024ef1b7 100644 (file)
@@ -8,9 +8,18 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
@@ -76,22 +85,12 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
- * @author mirehak
  *
  */
 public class MDController implements IMDController, AutoCloseable {
@@ -108,8 +107,6 @@ public class MDController implements IMDController, AutoCloseable {
     final private int OF13 = OFConstants.OFP_VERSION_1_3;
 
     private ErrorHandlerSimpleImpl errorHandler;
-    private ExecutorService rpcPool;
-
 
     /**
      * @return translator mapping
@@ -202,7 +199,8 @@ public class MDController implements IMDController, AutoCloseable {
         
         // prepare worker pool for rpc
         // TODO: get size from configSubsystem
-        OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10));
+        OFSessionUtil.getSessionManager().setRpcPool(
+                MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)));
         
     }
 
index 6266e22ef194be59d8d4def841b3083d1feafbb4..c876917f28c8e9e97ed97bdb1a175f84bc1f9484 100644 (file)
@@ -11,6 +11,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
@@ -93,4 +95,32 @@ public abstract class MessageFactory {
         }
         return result;
     }
+    
+    /**
+     * @param ofVersion 
+     * @param ofXid 
+     * @return barrier message
+     */
+    public static BarrierInput createBarrier(short ofVersion, long ofXid) {
+        BarrierInputBuilder barrierInput = new BarrierInputBuilder();
+        barrierInput.setVersion(ofVersion);
+        barrierInput.setXid(ofXid);
+        return barrierInput.build();
+    }
+    
+//    /**
+//     * @param input
+//     * @param cookie
+//     * @param session
+//     * @param messageService 
+//     * @return barrier result
+//     */
+//    public static Future<RpcResult<BarrierOutput>> sendBarrier(
+//            SwitchConnectionDistinguisher cookie, SessionContext session, 
+//            IMessageDispatchService messageService) {
+//        BarrierInputBuilder barrierInput = new BarrierInputBuilder();
+//        barrierInput.setVersion(session.getFeatures().getVersion());
+//        barrierInput.setXid(session.getNextXid());
+//        return messageService.barrier(barrierInput.build(), cookie);
+//    }
 }
index 4b57aac794c2557a53778ba3dfe615fc3bcb65f1..de5883e445534cee7fe440102eb0119dc4deb8ed 100644 (file)
@@ -30,7 +30,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.Matc
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
@@ -64,8 +63,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.p
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput;
@@ -90,8 +87,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
@@ -182,6 +177,7 @@ import org.slf4j.Logger;
 import com.google.common.base.Objects;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * RPC implementation of MD-switch
@@ -192,22 +188,28 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     private final NodeId nodeId;
     private final IMessageDispatchService messageService;
     private short version = 0;
-    private final SessionContext session;
-    NotificationProviderService rpcNotificationProviderService;
-    private OFRpcTaskHelper rpcTaskHelper;
+    private NotificationProviderService rpcNotificationProviderService;
+    private OFRpcTaskContext rpcTaskContext;
     
     // TODO:read timeout from configSubsystem
     protected long maxTimeout = 1000;
     protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
     
-    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext context) {
-        super(identifier, context);
+    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, 
+            final SessionContext sessionContext) {
+        super(identifier, sessionContext);
         this.nodeId = nodeId;
         messageService = sessionContext.getMessageDispatchService();
-        version = context.getPrimaryConductor().getVersion();
-        this.session = context;
+        version = sessionContext.getPrimaryConductor().getVersion();
         rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
-        rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService);
+        
+        rpcTaskContext = new OFRpcTaskContext();
+        rpcTaskContext.setSession(sessionContext);
+        rpcTaskContext.setMessageService(messageService);
+        rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
+        rpcTaskContext.setMaxTimeout(maxTimeout);
+        rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
+        rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool());
     }
 
     @Override
@@ -217,11 +219,10 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         SwitchConnectionDistinguisher cookie = null;
         
         OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
-                OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
-        rpcTaskHelper.initTask(task, input, cookie);
-        OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+                OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
         
-        return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()), 
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), 
                 OFRpcFutureResultTransformFactory.createForAddFlowOutput());
     }
 
@@ -229,108 +230,31 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     @Override
     public Future<RpcResult<AddGroupOutput>> addGroup(final AddGroupInput input) {
         LOG.debug("Calling the GroupMod RPC method on MessageDispatchService");
-        Long xId = null;
-
+        
         // use primary connection
         SwitchConnectionDistinguisher cookie = null;
         
-        if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the AddGroupInput to GroupModInput
-        GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
-                .getFeatures().getDatapathId());
-        xId = session.getNextXid();
-        ofGroupModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            GroupAddedBuilder groupMod = new GroupAddedBuilder(
-                    (org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group) input);
-            groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            groupMod.setGroupRef(input.getGroupRef());
-            rpcNotificationProviderService.publish(groupMod.build());
-        }
-
-        Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
-        RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for AddGroup RPC" + ex.getMessage());
-        }
-
-        UpdateGroupOutput updateGroupOutput = rpcResultFromOFLib.getResult();
-
-        AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder();
-        addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId());
-        AddGroupOutput result = addGroupOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<AddGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Add Group RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                OFRpcTaskFactory.createAddGroupTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+        
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), 
+                OFRpcFutureResultTransformFactory.createForAddGroupOutput());
     }
 
     @Override
     public Future<RpcResult<AddMeterOutput>> addMeter(final AddMeterInput input) {
         LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
-        Long xId = null;
-        // For Meter provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the AddMeterInput to MeterModInput
-        MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
-        xId = session.getNextXid();
-        ofMeterModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            MeterAddedBuilder meterMod = new MeterAddedBuilder(
-                    (org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter) input);
-            meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            meterMod.setMeterRef(input.getMeterRef());
-            rpcNotificationProviderService.publish(meterMod.build());
-        }
-
-        Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
-        RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for AddMeter RPC" + ex.getMessage());
-        }
-
-        UpdateMeterOutput updateMeterOutput = rpcResultFromOFLib.getResult();
-
-        AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder();
-        addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId());
-        AddMeterOutput result = addMeterOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<AddMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Add Meter RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                OFRpcTaskFactory.createAddMeterTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+        
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), 
+                OFRpcFutureResultTransformFactory.createForAddMeterOutput());
     }
 
     @Override
@@ -344,7 +268,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         SwitchConnectionDistinguisher cookie = null;
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            xId = session.getNextXid();
+            xId = sessionContext.getNextXid();
             barrierInput.setXid(xId);
             barrierInput.setVersion(version);
             @SuppressWarnings("unused")
@@ -352,9 +276,9 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         }
 
         // Convert the RemoveFlowInput to FlowModInput
-        FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext()
+        FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, sessionContext
                 .getFeatures().getDatapathId());
-        xId = session.getNextXid();
+        xId = sessionContext.getNextXid();
         ofFlowModInput.setXid(xId);
 
         if (null != rpcNotificationProviderService) {
@@ -399,7 +323,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
         SwitchConnectionDistinguisher cookie = null;
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
+            xId = sessionContext.getNextXid();
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
             barrierInput.setVersion(version);
             barrierInput.setXid(xId);
@@ -410,7 +334,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         // Convert the RemoveGroupInput to GroupModInput
         GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
                 .getFeatures().getDatapathId());
-        xId = session.getNextXid();
+        xId = sessionContext.getNextXid();
         ofGroupModInput.setXid(xId);
 
         if (null != rpcNotificationProviderService) {
@@ -454,7 +378,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         // the request can be routed through any connection to the switch
         SwitchConnectionDistinguisher cookie = null;
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
+            xId = sessionContext.getNextXid();
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
             barrierInput.setVersion(version);
             barrierInput.setXid(xId);
@@ -464,7 +388,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
         // Convert the RemoveMeterInput to MeterModInput
         MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
-        xId = session.getNextXid();
+        xId = sessionContext.getNextXid();
         ofMeterModInput.setXid(xId);
 
         if (null != rpcNotificationProviderService) {
@@ -548,119 +472,38 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         SwitchConnectionDistinguisher cookie = null;
         
         OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
-                OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
-        rpcTaskHelper.initTask(task, input, cookie);
-        OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+                OFRpcTaskFactory.createUpdateFlowTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
         
-        return task.getResult();
+        return result;
     }
 
     @Override
     public Future<RpcResult<UpdateGroupOutput>> updateGroup(final UpdateGroupInput input) {
         LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService");
-        Long xId = null;
-
-        // For Flow provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.getUpdatedGroup().isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the UpdateGroupInput to GroupModInput
-        GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input.getUpdatedGroup(), version, this
-                .getSessionContext().getFeatures().getDatapathId());
-        xId = session.getNextXid();
-        ofGroupModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
-            groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            groupMod.setGroupRef(input.getGroupRef());
-            rpcNotificationProviderService.publish(groupMod.build());
-        }
-
-        Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
-
-        RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for updateGroup RPC" + ex.getMessage());
-        }
-
-        UpdateGroupOutput updateGroupOutputOFLib = rpcResultFromOFLib.getResult();
-
-        UpdateGroupOutputBuilder updateGroupOutput = new UpdateGroupOutputBuilder();
-        updateGroupOutput.setTransactionId(updateGroupOutputOFLib.getTransactionId());
-        UpdateGroupOutput result = updateGroupOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Update Group RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                OFRpcTaskFactory.createUpdateGroupTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+        
+        return result;
     }
 
     @Override
     public Future<RpcResult<UpdateMeterOutput>> updateMeter(final UpdateMeterInput input) {
         LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
-        Long xId = null;
-
-        // For Meter provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.getUpdatedMeter().isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the UpdateMeterInput to MeterModInput
-        MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input.getUpdatedMeter(), version);
-        xId = session.getNextXid();
-        ofMeterModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
-            meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            meterMod.setMeterRef(input.getMeterRef());
-            rpcNotificationProviderService.publish(meterMod.build());
-        }
-
-        Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
-        RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for UpdateMeter RPC" + ex.getMessage());
-        }
-
-        UpdateMeterOutput updateMeterOutputFromOFLib = rpcResultFromOFLib.getResult();
-
-        UpdateMeterOutputBuilder updateMeterOutput = new UpdateMeterOutputBuilder();
-        updateMeterOutput.setTransactionId(updateMeterOutputFromOFLib.getTransactionId());
-        UpdateMeterOutput result = updateMeterOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Update Meter RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                OFRpcTaskFactory.createUpdateMeterTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+        
+        return result;
     }
 
     @Override
@@ -1667,21 +1510,4 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         RpcResult<GetQueueStatisticsFromGivenPortOutput> rpcResult = Rpcs.getRpcResult(true, output.build(), errors);
         return Futures.immediateFuture(rpcResult);
     }
-    
-    /**
-     * @param input
-     * @param cookie
-     * @param session
-     * @param messageService 
-     * @return barrier result
-     */
-    public static Future<RpcResult<BarrierOutput>> sendBarrier(
-            SwitchConnectionDistinguisher cookie, SessionContext session, 
-            IMessageDispatchService messageService) {
-        BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-        barrierInput.setVersion(session.getFeatures().getVersion());
-        barrierInput.setXid(session.getNextXid());
-        return messageService.barrier(barrierInput.build(), cookie);
-    }
-
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java
new file mode 100644 (file)
index 0000000..08e961a
--- /dev/null
@@ -0,0 +1,22 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+/**
+ * @param <N> type of notification
+ * 
+ */
+public interface NotificationComposer<N extends Notification> {
+    
+    /**
+     * @return notification instance
+     */
+    N compose();
+}
index a0bb177b2522b75ee67d5bc22a05960deb427c66..b2c1a42817d6e2befd8574019f7ad09deb6df90f 100644 (file)
@@ -13,6 +13,12 @@ import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
@@ -28,6 +34,17 @@ public abstract class OFRpcFutureResultTransformFactory {
     protected static Logger LOG = LoggerFactory
             .getLogger(OFRpcFutureResultTransformFactory.class);
 
+    /**
+     * @param input
+     * @param result
+     * @return
+     */
+    protected static <E> RpcResult<E> assembleRpcResult(RpcResult<?> input, E result) {
+        Collection<RpcError> errors = input.getErrors();
+        RpcResult<E> rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors);
+        return rpcResult;
+    }
+
     /**
      * @return translator from {@link UpdateFlowOutput} to {@link AddFlowOutput}
      */
@@ -35,7 +52,7 @@ public abstract class OFRpcFutureResultTransformFactory {
         return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>() {
 
             @Override
-            public RpcResult<AddFlowOutput> apply(final RpcResult<UpdateFlowOutput> input) {
+            public RpcResult<AddFlowOutput> apply(RpcResult<UpdateFlowOutput> input) {
 
                 UpdateFlowOutput updateFlowOutput = input.getResult();
 
@@ -51,15 +68,49 @@ public abstract class OFRpcFutureResultTransformFactory {
         };
     }
     
+    /**
+     * @return translator from {@link UpdateGroupOutput} to {@link AddGroupOutput}
+     */
+    public static Function<RpcResult<UpdateGroupOutput>, RpcResult<AddGroupOutput>> createForAddGroupOutput() {
+        return new Function<RpcResult<UpdateGroupOutput>,RpcResult<AddGroupOutput>>() {
+
+            @Override
+            public RpcResult<AddGroupOutput> apply(RpcResult<UpdateGroupOutput> input) {
+                UpdateGroupOutput updateGroupOutput = input.getResult();
+                
+                AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder();
+                addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId());
+                AddGroupOutput result = addGroupOutput.build();
+
+                RpcResult<AddGroupOutput> rpcResult = assembleRpcResult(input, result);
+                LOG.debug("Returning the Add Group RPC result to MD-SAL");
+                return rpcResult;
+            }
+        };
+    }
     
     /**
-     * @param input
-     * @param result
-     * @return
+     * @return translator from {@link UpdateGroupOutput} to {@link AddGroupOutput}
      */
-    protected static <E> RpcResult<E> assembleRpcResult(RpcResult<?> input, E result) {
-        Collection<RpcError> errors = input.getErrors();
-        RpcResult<E> rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors);
-        return rpcResult;
+    public static Function<RpcResult<UpdateMeterOutput>, RpcResult<AddMeterOutput>> createForAddMeterOutput() {
+        return new Function<RpcResult<UpdateMeterOutput>,RpcResult<AddMeterOutput>>() {
+
+            @Override
+            public RpcResult<AddMeterOutput> apply(final RpcResult<UpdateMeterOutput> input) {
+                UpdateMeterOutput updateMeterOutput = input.getResult();
+                
+                AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder();
+                addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId());
+                AddMeterOutput result = addMeterOutput.build();
+
+                RpcResult<AddMeterOutput> rpcResult = assembleRpcResult(input, result);
+                LOG.debug("Returning the Add Meter RPC result to MD-SAL");
+                return rpcResult;
+            }
+        };
     }
+    
+    
+    
+    
 }
index b153fcc420f8c13ae8cf6a4cb15342278b1262d8..79764f9dd09df8ec5e29b2c7707e0edfab7d4cc3 100644 (file)
@@ -7,38 +7,36 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * @param <T> input type
  * @param <K> future output type
  */
-public abstract class OFRpcTask<T, K> implements Runnable {
+public abstract class OFRpcTask<T, K> implements Callable<ListenableFuture<K>> {
     
-    private SwitchConnectionDistinguisher cookie;
-    private IMessageDispatchService messageService;
-    private SessionContext session;
+    private OFRpcTaskContext taskContext;
     private T input;
-    private SettableFuture<K> result;
-    private NotificationProviderService rpcNotificationProviderService;
-    
-    /**
-     * @return the result
-     */
-    public SettableFuture<K> getResult() {
-        return result;
-    }
+    private SwitchConnectionDistinguisher cookie;
     
     /**
-     * @param result the result to set
+     * @param taskContext
+     * @param input 
+     * @param cookie 
      */
-    public void setResult(SettableFuture<K> result) {
-        this.result = result;
+    public OFRpcTask(OFRpcTaskContext taskContext, SwitchConnectionDistinguisher cookie, T input) {
+        this.taskContext = taskContext;
+        this.cookie = cookie;
+        this.input = input;
     }
 
     /**
@@ -49,73 +47,86 @@ public abstract class OFRpcTask<T, K> implements Runnable {
     }
 
     /**
-     * @return the messageService
+     * @param cookie the cookie to set
      */
-    public IMessageDispatchService getMessageService() {
-        return messageService;
+    public void setCookie(SwitchConnectionDistinguisher cookie) {
+        this.cookie = cookie;
     }
 
     /**
-     * @return the session
+     * @return the input
      */
-    public SessionContext getSession() {
-        return session;
+    public T getInput() {
+        return input;
     }
-    
+
     /**
-     * @return protocol version
+     * @param input the input to set
      */
-    public Short getVersion() {
-        return session.getFeatures().getVersion();
+    public void setInput(T input) {
+        this.input = input;
     }
 
     /**
-     * @param cookie the cookie to set
+     * @return the rpcNotificationProviderService
      */
-    public void setCookie(SwitchConnectionDistinguisher cookie) {
-        this.cookie = cookie;
+    public NotificationProviderService getRpcNotificationProviderService() {
+        return taskContext.getRpcNotificationProviderService();
     }
 
     /**
-     * @param messageService the messageService to set
+     * @return message service
+     * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMessageService()
      */
-    public void setMessageService(IMessageDispatchService messageService) {
-        this.messageService = messageService;
+    public IMessageDispatchService getMessageService() {
+        return taskContext.getMessageService();
     }
 
     /**
-     * @param session the session to set
+     * @return session
+     * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getSession()
      */
-    public void setSession(SessionContext session) {
-        this.session = session;
+    public SessionContext getSession() {
+        return taskContext.getSession();
     }
 
     /**
-     * @return the input
+     * @return max timeout
+     * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMaxTimeout()
      */
-    public T getInput() {
-        return input;
+    public long getMaxTimeout() {
+        return taskContext.getMaxTimeout();
     }
 
     /**
-     * @param input the input to set
+     * @return time unit for max timeout
+     * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMaxTimeoutUnit()
      */
-    public void setInput(T input) {
-        this.input = input;
+    public TimeUnit getMaxTimeoutUnit() {
+        return taskContext.getMaxTimeoutUnit();
     }
-
+    
     /**
-     * @param rpcNotificationProviderService
+     * @return protocol version
      */
-    public void setRpcNotificationProviderService(
-            NotificationProviderService rpcNotificationProviderService) {
-                this.rpcNotificationProviderService = rpcNotificationProviderService;
+    public Short getVersion() {
+        return taskContext.getSession().getFeatures().getVersion();
+        
     }
     
     /**
-     * @return the rpcNotificationProviderService
+     * @return the taskContext
      */
-    public NotificationProviderService getRpcNotificationProviderService() {
-        return rpcNotificationProviderService;
+    public OFRpcTaskContext getTaskContext() {
+        return taskContext;
+    }
+    
+    /**
+     * submit task into rpc worker pool
+     * @return future result of task 
+     */
+    public ListenableFuture<K> submit() {
+        ListenableFuture<ListenableFuture<K>> compoundResult = getTaskContext().getRpcPool().submit(this);
+        return Futures.dereference(compoundResult);
     }
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java
new file mode 100644 (file)
index 0000000..e96da2b
--- /dev/null
@@ -0,0 +1,104 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ * 
+ */
+public class OFRpcTaskContext {
+
+    private IMessageDispatchService messageService;
+    private SessionContext session;
+    private NotificationProviderService rpcNotificationProviderService;
+    private long maxTimeout;
+    private TimeUnit maxTimeoutUnit;
+    private ListeningExecutorService rpcPool;
+    
+    /**
+     * @return the messageService
+     */
+    public IMessageDispatchService getMessageService() {
+        return messageService;
+    }
+    /**
+     * @param messageService the messageService to set
+     */
+    public void setMessageService(IMessageDispatchService messageService) {
+        this.messageService = messageService;
+    }
+    /**
+     * @return the session
+     */
+    public SessionContext getSession() {
+        return session;
+    }
+    /**
+     * @param session the session to set
+     */
+    public void setSession(SessionContext session) {
+        this.session = session;
+    }
+    /**
+     * @return the rpcNotificationProviderService
+     */
+    public NotificationProviderService getRpcNotificationProviderService() {
+        return rpcNotificationProviderService;
+    }
+    /**
+     * @param rpcNotificationProviderService the rpcNotificationProviderService to set
+     */
+    public void setRpcNotificationProviderService(
+            NotificationProviderService rpcNotificationProviderService) {
+        this.rpcNotificationProviderService = rpcNotificationProviderService;
+    }
+    /**
+     * @return the maxTimeout
+     */
+    public long getMaxTimeout() {
+        return maxTimeout;
+    }
+    /**
+     * @param maxTimeout the maxTimeout to set
+     */
+    public void setMaxTimeout(long maxTimeout) {
+        this.maxTimeout = maxTimeout;
+    }
+    /**
+     * @return the maxTimeoutUnit
+     */
+    public TimeUnit getMaxTimeoutUnit() {
+        return maxTimeoutUnit;
+    }
+    /**
+     * @param maxTimeoutUnit the maxTimeoutUnit to set
+     */
+    public void setMaxTimeoutUnit(TimeUnit maxTimeoutUnit) {
+        this.maxTimeoutUnit = maxTimeoutUnit;
+    }
+    /**
+     * @param rpcPool
+     */
+    public void setRpcPool(ListeningExecutorService rpcPool) {
+        this.rpcPool = rpcPool;
+    }
+    
+    /**
+     * @return the rpcPool
+     */
+    public ListeningExecutorService getRpcPool() {
+        return rpcPool;
+    }
+}
index 2bd8346d13962a0ca05e3e611afbea4a82529b9f..921d49421b57f3673e9697ee0282761da8e65d59 100644 (file)
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 /**
  *
  */
 public abstract class OFRpcTaskFactory {
 
     /**
-     * @param maxTimeout
-     * @param maxTimeoutUnit
-     * @param helper
+     * @param taskContext 
+     * @param input 
+     * @param cookie 
      * @return UpdateFlow task
      */
     public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
-            final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
-        OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
-                new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>() {
-
+            OFRpcTaskContext taskContext, AddFlowInput input, 
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
+            
             @Override
-            public void run() {
-                helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult());
-                if (getResult().isDone()) {
-                    return;
-                }
-
-                // Convert the AddFlowInput to FlowModInput
-                FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
-                        getVersion(), getSession().getFeatures().getDatapathId());
-                Long xId = getSession().getNextXid();
-                ofFlowModInput.setXid(xId);
-
-                if (null != getRpcNotificationProviderService()) {
-                    FlowAddedBuilder newFlow = new FlowAddedBuilder(
-                            (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
-                    newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-                    newFlow.setFlowRef(getInput().getFlowRef());
-                    getRpcNotificationProviderService().publish(newFlow.build());
+            public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
+                ListenableFuture<RpcResult<UpdateFlowOutput>> result = SettableFuture.create();
+                
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddFlowInput to FlowModInput
+                    FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), 
+                            getVersion(), getSession().getFeatures().getDatapathId());
+                    final Long xId = getSession().getNextXid();
+                    ofFlowModInput.setXid(xId);
+                    
+                    Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
+                            getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createFlowAddedNotification(xId, getInput()));
                 }
 
-                Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
-                        getMessageService().flowMod(ofFlowModInput.build(), getCookie());
-                OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+                return result;
             }
         };
+        
         return task;
     }
 
     /**
-     * @param maxTimeout
-     * @param maxTimeoutUnit
-     * @param helper
+     * @param xId
+     * @return
+     */
+    protected static NotificationComposer<FlowAdded> createFlowAddedNotification(
+            final Long xId, final AddFlowInput input) {
+        return new NotificationComposer<FlowAdded>() {
+            @Override
+            public FlowAdded compose() {
+                FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
+                newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                newFlow.setFlowRef(input.getFlowRef());
+                return newFlow.build();
+            }
+        };
+    }
+
+    /**
+     * @param taskContext 
+     * @param input 
+     * @param cookie 
      * @return UpdateFlow task
      */
     public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
-            final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
-        OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
-                new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>() {
+            OFRpcTaskContext taskContext, UpdateFlowInput input, 
+            SwitchConnectionDistinguisher cookie) {
+        
+        OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
+                ListenableFuture<RpcResult<UpdateFlowOutput>> result = null;
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
+                        getInput().getUpdatedFlow().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddFlowInput to FlowModInput
+                    FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), 
+                            getVersion(), getSession().getFeatures().getDatapathId());
+                    Long xId = getSession().getNextXid();
+                    ofFlowModInput.setXid(xId);
+    
+                    Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
+                            getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createFlowUpdatedNotification(xId, getInput()));
+                }
+                return result;
+            }
+        };
+        return task;
+    }
 
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<FlowUpdated> createFlowUpdatedNotification(
+            final Long xId, final UpdateFlowInput input) {
+        return new NotificationComposer<FlowUpdated>() {
+            @Override
+            public FlowUpdated compose() {
+                FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
+                updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                updFlow.setFlowRef(input.getFlowRef());
+                return updFlow.build();
+            }
+        };
+    }
+    
+    /**
+     * @param taskContext
+     * @param input
+     * @param cookie
+     * @return update group task
+     */
+    public static OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> createAddGroupTask(
+            final OFRpcTaskContext taskContext, AddGroupInput input, 
+            final SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                new OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
+            
             @Override
-            public void run() {
-                helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult());
-                if (getResult().isDone()) {
-                    return;
+            public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
+                ListenableFuture<RpcResult<UpdateGroupOutput>> result = SettableFuture.create();
+                
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddGroupInput to GroupModInput
+                    GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), 
+                            getVersion(), getSession().getFeatures().getDatapathId());
+                    final Long xId = getSession().getNextXid();
+                    ofGroupModInput.setXid(xId);
+                    
+                    Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = getMessageService()
+                            .groupMod(ofGroupModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createGroupAddedNotification(xId, getInput()));
                 }
 
-                // Convert the AddFlowInput to FlowModInput
-                FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(),
-                        getVersion(), getSession().getFeatures().getDatapathId());
-                Long xId = getSession().getNextXid();
-                ofFlowModInput.setXid(xId);
+                return result;
+            }
+        };
+        
+        return task;
+    }
+    
 
-                if (null != getRpcNotificationProviderService()) {
-                    FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(getInput().getUpdatedFlow());
-                    updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-                    updFlow.setFlowRef(getInput().getFlowRef());
-                    getRpcNotificationProviderService().publish(updFlow.build());
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<GroupAdded> createGroupAddedNotification(
+            final Long xId, final AddGroupInput input) {
+        return new NotificationComposer<GroupAdded>() {
+            @Override
+            public GroupAdded compose() {
+                GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input);
+                groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                groupMod.setGroupRef(input.getGroupRef());
+                return groupMod.build();
+            }
+        };
+    }
+
+    /**
+     * @param taskContext
+     * @param input
+     * @param cookie
+     * @return update meter task
+     */
+    public static OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> createAddMeterTask(
+            OFRpcTaskContext taskContext, AddMeterInput input,
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                new OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
+                ListenableFuture<RpcResult<UpdateMeterOutput>> result = SettableFuture.create();
+                
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddGroupInput to GroupModInput
+                    MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion());
+                    final Long xId = getSession().getNextXid();
+                    ofMeterModInput.setXid(xId);
+                    
+                    Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = getMessageService()
+                            .meterMod(ofMeterModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createMeterAddedNotification(xId, getInput()));
                 }
 
-                Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
-                        getMessageService().flowMod(ofFlowModInput.build(), getCookie());
-                OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+                return result;
             }
         };
+        
         return task;
+        
+    }
+
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<MeterAdded> createMeterAddedNotification(
+            final Long xId, final AddMeterInput input) {
+        return new NotificationComposer<MeterAdded>() {
+            @Override
+            public MeterAdded compose() {
+                MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input);
+                meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                meterMod.setMeterRef(input.getMeterRef());
+                return meterMod.build();
+            }
+        };
+    }
+    
+    /**
+     * @param taskContext 
+     * @param input 
+     * @param cookie 
+     * @return UpdateFlow task
+     */
+    public static OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> createUpdateGroupTask(
+            OFRpcTaskContext taskContext, UpdateGroupInput input, 
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                new OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
+                ListenableFuture<RpcResult<UpdateGroupOutput>> result = null;
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
+                        getInput().getUpdatedGroup().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the UpdateGroupInput to GroupModInput
+                    GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(
+                            getInput().getUpdatedGroup(), getVersion(),
+                            getSession().getFeatures().getDatapathId());
+                    final Long xId = getSession().getNextXid();
+                    ofGroupModInput.setXid(xId);
+    
+                    Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = 
+                            getMessageService().groupMod(ofGroupModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createGroupUpdatedNotification(xId, getInput()));
+                }
+                return result;
+            }
+        };
+        return task;
+    }
+    
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<GroupUpdated> createGroupUpdatedNotification(
+            final Long xId, final UpdateGroupInput input) {
+        return new NotificationComposer<GroupUpdated>() {
+            @Override
+            public GroupUpdated compose() {
+                GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
+                groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                groupMod.setGroupRef(input.getGroupRef());
+                return groupMod.build();
+            }
+        };
     }
 
+    /**
+     * @param taskContext 
+     * @param input
+     * @param cookie
+     * @return update meter task 
+     */
+    public static OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> createUpdateMeterTask(
+            OFRpcTaskContext taskContext, UpdateMeterInput input,
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                new OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
+                ListenableFuture<RpcResult<UpdateMeterOutput>> result = null;
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
+                        getInput().getUpdatedMeter().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the UpdateMeterInput to MeterModInput
+                    MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(
+                            getInput().getUpdatedMeter(), getVersion());
+                    final Long xId = getSession().getNextXid();
+                    ofMeterModInput.setXid(xId);
+    
+                    Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = 
+                            getMessageService().meterMod(ofMeterModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createMeterUpdatedNotification(xId, getInput()));
+                }
+                return result;
+            }
+        };
+        return task;
+    }
+    
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<MeterUpdated> createMeterUpdatedNotification(
+            final Long xId, final UpdateMeterInput input) {
+        return new NotificationComposer<MeterUpdated>() {
+            @Override
+            public MeterUpdated compose() {
+                MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
+                meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                meterMod.setMeterRef(input.getMeterRef());
+                return meterMod.build();
+            }
+        };
+    }
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java
deleted file mode 100644 (file)
index d844a65..0000000
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * 
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.openflow.md.core.sal;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
-import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.SettableFuture;
-
-/**
- * 
- */
-public class OFRpcTaskHelper {
-
-    private IMessageDispatchService messageService;
-    private SessionContext session;
-    private NotificationProviderService rpcNotificationProviderService;
-    /**
-     * @param cookie
-     * @param messageService
-     * @param session
-     * @param rpcNotificationProviderService 
-     */
-    public OFRpcTaskHelper(IMessageDispatchService messageService, SessionContext session, 
-            NotificationProviderService rpcNotificationProviderService) {
-        this.messageService = messageService;
-        this.session = session;
-        this.rpcNotificationProviderService = rpcNotificationProviderService;
-    }
-    
-    
-    /**
-     * @param task
-     * @param input 
-     * @param cookie 
-     * @return inited task
-     */
-    public <T, K> OFRpcTask<T, K> initTask(OFRpcTask<T, K> task, T input, SwitchConnectionDistinguisher cookie) {
-        task.setMessageService(messageService);
-        task.setSession(session);
-        task.setRpcNotificationProviderService(rpcNotificationProviderService);
-        task.setResult(SettableFuture.<K>create());
-        task.setCookie(cookie);
-        task.setInput(input);
-        return task;
-    }
-    
-    /**
-     * @param intern 
-     * @param wrapper 
-     */
-    public static <K> void chainFutures(final Future<K> intern, final SettableFuture<K> wrapper) {
-        Futures.addCallback(
-                JdkFutureAdapters.listenInPoolThread(intern),
-                new FutureCallback<K>() {
-
-                    @Override
-                    public void onSuccess(
-                            K result) {
-                        wrapper.set(result);
-                    }
-
-                    @Override
-                    public void onFailure(Throwable t) {
-                        wrapper.setException(t);
-                    }
-
-                });
-    }
-    
-    /**
-     * @param maxTimeout
-     * @param maxTimeoutUnit
-     * @param isBarrier 
-     * @param cookie 
-     * @param result 
-     */
-    public <T> void rawBarrierSend(final long maxTimeout, final TimeUnit maxTimeoutUnit, 
-            Boolean isBarrier, SwitchConnectionDistinguisher cookie, SettableFuture<RpcResult<T>> result) {
-        if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
-            Future<RpcResult<BarrierOutput>> barrierFuture = ModelDrivenSwitchImpl.sendBarrier(cookie, session, messageService);
-            try {
-                RpcResult<BarrierOutput> barrierResult = barrierFuture.get(maxTimeout, maxTimeoutUnit);
-                if (!barrierResult.isSuccessful()) {
-                    result.set(Rpcs.<T>getRpcResult(false, barrierResult.getErrors()));
-                }
-            } catch (Exception e) {
-                result.setException(e);
-            }
-        }
-    }
-}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java
new file mode 100644 (file)
index 0000000..03662fd
--- /dev/null
@@ -0,0 +1,121 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowplugin.openflow.md.OFConstants;
+import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * 
+ */
+public abstract class OFRpcTaskUtil {
+
+    /**
+     * @param taskContext 
+     * @param isBarrier 
+     * @param cookie 
+     * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success
+     */
+    public static Collection<RpcError> manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier, 
+            SwitchConnectionDistinguisher cookie) {
+        Collection<RpcError> errors = null;
+        if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
+            Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
+            try {
+                RpcResult<BarrierOutput> barrierResult = barrierFuture.get(
+                        taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit());
+                if (!barrierResult.isSuccessful()) {
+                    errors = barrierResult.getErrors();
+                }
+            } catch (Exception e) {
+                RpcError rpcError = RpcErrors.getRpcError(
+                        OFConstants.APPLICATION_TAG, OFConstants.ERROR_TAG_TIMEOUT, 
+                        "barrier sending failed", ErrorSeverity.WARNING, 
+                        "switch failed to respond on barrier request - message ordering is not preserved", ErrorType.RPC, e);
+                errors = Lists.newArrayList(rpcError);
+            }
+        } 
+        
+        if (errors == null) {
+            errors = Collections.emptyList();
+        }
+        
+        return errors;
+    }
+
+    /**
+     * @param session
+     * @param cookie
+     * @param messageService
+     * @return barrier response
+     */
+    private static Future<RpcResult<BarrierOutput>> sendBarrier(SessionContext session, 
+            SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) {
+        BarrierInput barrierInput = MessageFactory.createBarrier(
+                session.getFeatures().getVersion(), session.getNextXid());
+        return messageService.barrier(barrierInput, cookie);
+    }
+
+    /**
+     * @param result rpcResult with success = false, errors = given collection
+     * @param barrierErrors
+     */
+    public static <T> void wrapBarrierErrors(SettableFuture<RpcResult<T>> result,
+            Collection<RpcError> barrierErrors) {
+        result.set(Rpcs.<T>getRpcResult(false, barrierErrors));
+    }
+    
+    /**
+     * @param originalResult
+     * @param notificationProviderService
+     * @param notificationComposer lazy notification composer
+     */
+    public static <R, N extends Notification> void hookFutureNotification(ListenableFuture<R> originalResult, 
+            final NotificationProviderService notificationProviderService, 
+            final NotificationComposer<N> notificationComposer) {
+        Futures.addCallback(originalResult, new FutureCallback<R>() {
+            @Override
+            public void onSuccess(R result) {
+                if (null != notificationProviderService) {
+                    notificationProviderService.publish(notificationComposer.compose());
+                }
+            }
+            
+            @Override
+            public void onFailure(Throwable t) {
+                //NOOP
+            }
+        });
+        
+    }
+
+}
index 0f127e83389f17a755aea4e92d2c9c149914d3dd..610e7d340b4350a9e2c5f0cb37999f2fd2fa386f 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.openflowplugin.openflow.md.core.session;
 
 import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.Future;
 
 import org.opendaylight.controller.sal.common.util.Rpcs;
@@ -47,12 +45,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * message dispatch service to send the message to switch.
@@ -119,24 +119,29 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService {
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowOutput>> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) {
+    public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary flowMod");
         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).flowMod(input);
 
-        // Send the same Xid back to caller - MessageDrivenSwitch
-        UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();        
-        BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ;
-        flowModOutput.setTransactionId(new TransactionId(bigIntXid));
-
-        UpdateFlowOutput result = flowModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        // solution 1: sending directly and hooking listener to get error
-        // hookup listener to catch the possible error with no reference to returned future-object
-        LOG.debug("Returning to ModelDrivenSwitch for flowMod RPC");
-        return Futures.immediateFuture(rpcResult);
-
+        // appending xid
+        ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response), 
+                new Function<RpcResult<Void>,RpcResult<UpdateFlowOutput>>() {
+
+            @Override
+            public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
+                UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();        
+                BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ;
+                flowModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+                UpdateFlowOutput result = flowModOutput.build();
+                RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
+                        inputArg.isSuccessful(), result, inputArg.getErrors());
+                return rpcResult;
+            }
+        });
+        
+        return xidResult;
     }
 
     @Override
@@ -161,45 +166,55 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService {
     }
 
     @Override
-    public Future<RpcResult<UpdateGroupOutput>> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) {        
+    public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {        
         LOG.debug("Calling OFLibrary groupMod");
         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).groupMod(input);
 
-        // Send the same Xid back to caller - MessageDrivenSwitch
-        UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();      
-        BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
-        groupModOutput.setTransactionId(new TransactionId(bigIntXid));
-       
-        UpdateGroupOutput result = groupModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        // solution 1: sending directly and hooking listener to get error
-        // hookup listener to catch the possible error with no reference to returned future-object
-        LOG.debug("Returning to ModelDrivenSwitch for groupMod RPC");
-        return Futures.immediateFuture(rpcResult);
-
+        // appending xid
+        ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response), 
+                new Function<RpcResult<Void>,RpcResult<UpdateGroupOutput>>() {
+
+            @Override
+            public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
+                UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();      
+                BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                groupModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+                UpdateGroupOutput result = groupModOutput.build();
+                RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
+                        inputArg.isSuccessful(), result, inputArg.getErrors());
+                return rpcResult;
+            }
+        });
+        
+        return xidResult;
     }
 
     @Override
-    public Future<RpcResult<UpdateMeterOutput>> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) {
+    public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary meterMod");
         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).meterMod(input);
 
-        // Send the same Xid back to caller - MessageDrivenSwitch
-        UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();       
-        BigInteger bigIntXid =BigInteger.valueOf(input.getXid());
-        meterModOutput.setTransactionId(new TransactionId(bigIntXid));
+        // appending xid
+        ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response), 
+                new Function<RpcResult<Void>,RpcResult<UpdateMeterOutput>>() {
+
+            @Override
+            public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
+                UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();       
+                BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                meterModOutput.setTransactionId(new TransactionId(bigIntXid));
+                
+                UpdateMeterOutput result = meterModOutput.build();
+                RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
+                        inputArg.isSuccessful(), result, inputArg.getErrors());
+                return rpcResult;
+            }
+        });
         
-        UpdateMeterOutput result = meterModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        // solution 1: sending directly and hooking listener to get error
-        // hookup listener to catch the possible error with no reference to returned future-object
-        LOG.debug("Returning to ModelDrivenSwitch for meterMod RPC");
-        return Futures.immediateFuture(rpcResult);
-
+        return xidResult;
     }
 
     @Override
@@ -213,22 +228,29 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService {
     }
 
     @Override
-    public Future<RpcResult<UpdatePortOutput>> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) {
-
+    public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
         LOG.debug("Calling OFLibrary portMod");
         Future<RpcResult<Void>> response = getConnectionAdapter(cookie).portMod(input);
-
-        // Send the same Xid back to caller - ModelDrivenSwitch
-        UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
-        String stringXid =input.getXid().toString();
-        BigInteger bigIntXid = new BigInteger( stringXid );
-        portModOutput.setTransactionId(new TransactionId(bigIntXid));
-        UpdatePortOutput result = portModOutput.build();
-        Collection<RpcError> errors = Collections.emptyList();
-        RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning to ModelDrivenSwitch for portMod RPC");
-        return Futures.immediateFuture(rpcResult);
+        
+        // appending xid
+        ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
+                JdkFutureAdapters.listenInPoolThread(response), 
+                new Function<RpcResult<Void>,RpcResult<UpdatePortOutput>>() {
+
+            @Override
+            public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
+                UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
+                BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+                portModOutput.setTransactionId(new TransactionId(bigIntXid));
+                
+                UpdatePortOutput result = portModOutput.build();
+                RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
+                        inputArg.isSuccessful(), result, inputArg.getErrors());
+                return rpcResult;
+            }
+        });
+        
+        return xidResult;
     }
 
     @Override
index cb5b2190aef17db5f879f779d3365cab38ba6a22..764fef56d0f8e1fce2c8a48482f174bc61f3d9aa 100644 (file)
@@ -18,11 +18,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
index a3204b3b76db26da01d85724d75765db214442ad..6e16b6eb7d10caee2e4953dd0edcc1008096feeb 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.openflowplugin.openflow.md.core.session;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
@@ -25,6 +24,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 /**
  * @author mirehak
  */
@@ -116,10 +117,10 @@ public interface SessionManager extends AutoCloseable {
     /**
      * @param newFixedThreadPool
      */
-    void setRpcPool(ExecutorService newFixedThreadPool);
+    void setRpcPool(ListeningExecutorService newFixedThreadPool);
 
     /**
      * @return the rpcPool instance
      */
-    ExecutorService getRpcPool();
+    ListeningExecutorService getRpcPool();
 }
index 0caa5de64c9730fc0cdf026a9a3fd6d0faca628b..afda07fc38961ec1c24156b9a0082811d47cfd36 100644 (file)
@@ -14,7 +14,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
@@ -30,6 +29,8 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 /**
  * @author mirehak
  */
@@ -41,11 +42,12 @@ public class SessionManagerOFImpl implements SessionManager {
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping;
 
-
     protected ListenerRegistry<SessionListener> sessionListeners;
     private NotificationProviderService notificationProviderService;
 
     private DataProviderService dataProviderService;
+    private ListeningExecutorService rpcPool;
+    
 
     /**
      * @return singleton instance
@@ -196,8 +198,7 @@ public class SessionManagerOFImpl implements SessionManager {
             }
         }
     };
-    private ExecutorService rpcPool;
-
+    
 
     @Override
     public Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> getTranslatorMapping() {
@@ -252,12 +253,12 @@ public class SessionManagerOFImpl implements SessionManager {
     }
 
     @Override
-    public void setRpcPool(ExecutorService rpcPool) {
+    public void setRpcPool(ListeningExecutorService rpcPool) {
         this.rpcPool = rpcPool;
     }
     
     @Override
-    public ExecutorService getRpcPool() {
+    public ListeningExecutorService getRpcPool() {
         return rpcPool;
     }
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java
deleted file mode 100644 (file)
index bb2fae9..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.opendaylight.openflowplugin.openflow.md.core.session;
-
-public class TransactionKey {
-
-    private static final long serialVersionUID = 7805731164917659700L;
-    final private Long _xId;
-
-    public TransactionKey(Long transactionId) {
-        this._xId = transactionId;
-    }
-
-    public Long getXId() {
-        return _xId;
-    }
-
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((_xId == null) ? 0 : _xId.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(java.lang.Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        TransactionKey other = (TransactionKey) obj;
-        if (_xId == null) {
-            if (other._xId != null) {
-                return false;
-            }
-        } else if (!_xId.equals(other._xId)) {
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("TransactionId [_xId=");
-        builder.append(_xId);
-        builder.append("]");
-        return builder.toString();
-    }
-
-}
index 7eafce1e9695eb6d9dae16a3bd0fa2d03dd6474c..30b77fdf02425722aacebdfdc99c6199810218c3 100644 (file)
@@ -19,7 +19,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev130731.matc
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.aggregate._case.MultipartRequestAggregateBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.flow._case.MultipartRequestFlowBuilder;
 
-public class FlowCreatorUtil {
+public abstract class FlowCreatorUtil {
     
     public static void setWildcardedFlowMatch(short version,MultipartRequestFlowBuilder flowBuilder){
         if(version == OFConstants.OFP_VERSION_1_0){
index 7aadc6c12f716203ef2318b86e78a4a98e24bc32..81a00302c0fbca7a4eea5166d4d61cff933203e3 100644 (file)
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.math.BigInteger;
 import java.util.List;
 
-public class InventoryDataServiceUtil {
+public abstract class InventoryDataServiceUtil {
     private final static Logger LOG = LoggerFactory.getLogger(InventoryDataServiceUtil.class);
 
     public final static String OF_URI_PREFIX = "openflow:";
index 5ba0ab58c7aa79cc9387728c32eff67b0e9df76f..0e237f436663edba8ce99bcd52f487f98f4f6c3e 100644 (file)
@@ -24,7 +24,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortStateV10;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
 
-public class PortTranslatorUtil {
+public abstract class PortTranslatorUtil {
     public static  org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortFeatures translatePortFeatures(PortFeatures apf) {
         org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortFeatures napf = null;
         if(apf != null){
index 41572000b63acaf06072c91587a422c8615967cb..f84f3fcf9882a1505a53ae13dec201ec381cff04 100644 (file)
@@ -12,7 +12,6 @@ import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -21,7 +20,6 @@ import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.sal.common.util.Futures;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.openflowplugin.openflow.md.OFConstants;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
@@ -29,7 +27,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistingu
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
@@ -43,8 +40,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * simple NPE smoke test
@@ -76,7 +73,7 @@ public class ModelDrivenSwitchImplTest {
         Mockito.when(context.getFeatures()).thenReturn(features);
         Mockito.when(features.getDatapathId()).thenReturn(BigInteger.valueOf(1));
         
-        OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10));
+        OFSessionUtil.getSessionManager().setRpcPool(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)));
 
         mdSwitchOF10 = new ModelDrivenSwitchImpl(null, null, context);
         mdSwitchOF13 = new ModelDrivenSwitchImpl(null, null, context);
index 57b202ad9bd8a1cb8eb4adb85ee34d7d72156971..764d7db58dec8a9c0e596dd703fc4dc281870519 100644 (file)
@@ -7,9 +7,9 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
-import junit.framework.Assert;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;