BUG-956 deadlock by rpc invocation - phase2
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / ModelDrivenSwitchImpl.java
index 4b57aac794c2557a53778ba3dfe615fc3bcb65f1..de5883e445534cee7fe440102eb0119dc4deb8ed 100644 (file)
@@ -30,7 +30,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.Matc
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
 import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
@@ -64,8 +63,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.p
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput;
@@ -90,8 +87,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
@@ -182,6 +177,7 @@ import org.slf4j.Logger;
 import com.google.common.base.Objects;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * RPC implementation of MD-switch
@@ -192,22 +188,28 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     private final NodeId nodeId;
     private final IMessageDispatchService messageService;
     private short version = 0;
-    private final SessionContext session;
-    NotificationProviderService rpcNotificationProviderService;
-    private OFRpcTaskHelper rpcTaskHelper;
+    private NotificationProviderService rpcNotificationProviderService;
+    private OFRpcTaskContext rpcTaskContext;
     
     // TODO:read timeout from configSubsystem
     protected long maxTimeout = 1000;
     protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
     
-    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext context) {
-        super(identifier, context);
+    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, 
+            final SessionContext sessionContext) {
+        super(identifier, sessionContext);
         this.nodeId = nodeId;
         messageService = sessionContext.getMessageDispatchService();
-        version = context.getPrimaryConductor().getVersion();
-        this.session = context;
+        version = sessionContext.getPrimaryConductor().getVersion();
         rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
-        rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService);
+        
+        rpcTaskContext = new OFRpcTaskContext();
+        rpcTaskContext.setSession(sessionContext);
+        rpcTaskContext.setMessageService(messageService);
+        rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
+        rpcTaskContext.setMaxTimeout(maxTimeout);
+        rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
+        rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool());
     }
 
     @Override
@@ -217,11 +219,10 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         SwitchConnectionDistinguisher cookie = null;
         
         OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
-                OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
-        rpcTaskHelper.initTask(task, input, cookie);
-        OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+                OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
         
-        return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()), 
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), 
                 OFRpcFutureResultTransformFactory.createForAddFlowOutput());
     }
 
@@ -229,108 +230,31 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     @Override
     public Future<RpcResult<AddGroupOutput>> addGroup(final AddGroupInput input) {
         LOG.debug("Calling the GroupMod RPC method on MessageDispatchService");
-        Long xId = null;
-
+        
         // use primary connection
         SwitchConnectionDistinguisher cookie = null;
         
-        if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the AddGroupInput to GroupModInput
-        GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
-                .getFeatures().getDatapathId());
-        xId = session.getNextXid();
-        ofGroupModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            GroupAddedBuilder groupMod = new GroupAddedBuilder(
-                    (org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group) input);
-            groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            groupMod.setGroupRef(input.getGroupRef());
-            rpcNotificationProviderService.publish(groupMod.build());
-        }
-
-        Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
-        RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for AddGroup RPC" + ex.getMessage());
-        }
-
-        UpdateGroupOutput updateGroupOutput = rpcResultFromOFLib.getResult();
-
-        AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder();
-        addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId());
-        AddGroupOutput result = addGroupOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<AddGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Add Group RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                OFRpcTaskFactory.createAddGroupTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+        
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), 
+                OFRpcFutureResultTransformFactory.createForAddGroupOutput());
     }
 
     @Override
     public Future<RpcResult<AddMeterOutput>> addMeter(final AddMeterInput input) {
         LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
-        Long xId = null;
-        // For Meter provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the AddMeterInput to MeterModInput
-        MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
-        xId = session.getNextXid();
-        ofMeterModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            MeterAddedBuilder meterMod = new MeterAddedBuilder(
-                    (org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter) input);
-            meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            meterMod.setMeterRef(input.getMeterRef());
-            rpcNotificationProviderService.publish(meterMod.build());
-        }
-
-        Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
-        RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for AddMeter RPC" + ex.getMessage());
-        }
-
-        UpdateMeterOutput updateMeterOutput = rpcResultFromOFLib.getResult();
-
-        AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder();
-        addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId());
-        AddMeterOutput result = addMeterOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<AddMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Add Meter RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                OFRpcTaskFactory.createAddMeterTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+        
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), 
+                OFRpcFutureResultTransformFactory.createForAddMeterOutput());
     }
 
     @Override
@@ -344,7 +268,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         SwitchConnectionDistinguisher cookie = null;
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            xId = session.getNextXid();
+            xId = sessionContext.getNextXid();
             barrierInput.setXid(xId);
             barrierInput.setVersion(version);
             @SuppressWarnings("unused")
@@ -352,9 +276,9 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         }
 
         // Convert the RemoveFlowInput to FlowModInput
-        FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext()
+        FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, sessionContext
                 .getFeatures().getDatapathId());
-        xId = session.getNextXid();
+        xId = sessionContext.getNextXid();
         ofFlowModInput.setXid(xId);
 
         if (null != rpcNotificationProviderService) {
@@ -399,7 +323,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
         SwitchConnectionDistinguisher cookie = null;
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
+            xId = sessionContext.getNextXid();
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
             barrierInput.setVersion(version);
             barrierInput.setXid(xId);
@@ -410,7 +334,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         // Convert the RemoveGroupInput to GroupModInput
         GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
                 .getFeatures().getDatapathId());
-        xId = session.getNextXid();
+        xId = sessionContext.getNextXid();
         ofGroupModInput.setXid(xId);
 
         if (null != rpcNotificationProviderService) {
@@ -454,7 +378,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         // the request can be routed through any connection to the switch
         SwitchConnectionDistinguisher cookie = null;
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
+            xId = sessionContext.getNextXid();
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
             barrierInput.setVersion(version);
             barrierInput.setXid(xId);
@@ -464,7 +388,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
         // Convert the RemoveMeterInput to MeterModInput
         MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
-        xId = session.getNextXid();
+        xId = sessionContext.getNextXid();
         ofMeterModInput.setXid(xId);
 
         if (null != rpcNotificationProviderService) {
@@ -548,119 +472,38 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         SwitchConnectionDistinguisher cookie = null;
         
         OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
-                OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
-        rpcTaskHelper.initTask(task, input, cookie);
-        OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+                OFRpcTaskFactory.createUpdateFlowTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
         
-        return task.getResult();
+        return result;
     }
 
     @Override
     public Future<RpcResult<UpdateGroupOutput>> updateGroup(final UpdateGroupInput input) {
         LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService");
-        Long xId = null;
-
-        // For Flow provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.getUpdatedGroup().isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the UpdateGroupInput to GroupModInput
-        GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input.getUpdatedGroup(), version, this
-                .getSessionContext().getFeatures().getDatapathId());
-        xId = session.getNextXid();
-        ofGroupModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
-            groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            groupMod.setGroupRef(input.getGroupRef());
-            rpcNotificationProviderService.publish(groupMod.build());
-        }
-
-        Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
-
-        RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for updateGroup RPC" + ex.getMessage());
-        }
-
-        UpdateGroupOutput updateGroupOutputOFLib = rpcResultFromOFLib.getResult();
-
-        UpdateGroupOutputBuilder updateGroupOutput = new UpdateGroupOutputBuilder();
-        updateGroupOutput.setTransactionId(updateGroupOutputOFLib.getTransactionId());
-        UpdateGroupOutput result = updateGroupOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Update Group RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                OFRpcTaskFactory.createUpdateGroupTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+        
+        return result;
     }
 
     @Override
     public Future<RpcResult<UpdateMeterOutput>> updateMeter(final UpdateMeterInput input) {
         LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
-        Long xId = null;
-
-        // For Meter provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.getUpdatedMeter().isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the UpdateMeterInput to MeterModInput
-        MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input.getUpdatedMeter(), version);
-        xId = session.getNextXid();
-        ofMeterModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
-            meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            meterMod.setMeterRef(input.getMeterRef());
-            rpcNotificationProviderService.publish(meterMod.build());
-        }
-
-        Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
-        RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for UpdateMeter RPC" + ex.getMessage());
-        }
-
-        UpdateMeterOutput updateMeterOutputFromOFLib = rpcResultFromOFLib.getResult();
-
-        UpdateMeterOutputBuilder updateMeterOutput = new UpdateMeterOutputBuilder();
-        updateMeterOutput.setTransactionId(updateMeterOutputFromOFLib.getTransactionId());
-        UpdateMeterOutput result = updateMeterOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Update Meter RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                OFRpcTaskFactory.createUpdateMeterTask(rpcTaskContext, input, cookie);
+        ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+        
+        return result;
     }
 
     @Override
@@ -1667,21 +1510,4 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         RpcResult<GetQueueStatisticsFromGivenPortOutput> rpcResult = Rpcs.getRpcResult(true, output.build(), errors);
         return Futures.immediateFuture(rpcResult);
     }
-    
-    /**
-     * @param input
-     * @param cookie
-     * @param session
-     * @param messageService 
-     * @return barrier result
-     */
-    public static Future<RpcResult<BarrierOutput>> sendBarrier(
-            SwitchConnectionDistinguisher cookie, SessionContext session, 
-            IMessageDispatchService messageService) {
-        BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-        barrierInput.setVersion(session.getFeatures().getVersion());
-        barrierInput.setXid(session.getNextXid());
-        return messageService.barrier(barrierInput.build(), cookie);
-    }
-
 }