BUG-956 deadlock by rpc invocation - phase2
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / sal / OFRpcTaskFactory.java
index 09281355a8070489f6ad1893a87927e44a4e46ee..921d49421b57f3673e9697ee0282761da8e65d59 100644 (file)
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 /**
  *
  */
 public abstract class OFRpcTaskFactory {
 
     /**
-     * @param maxTimeout
-     * @param maxTimeoutUnit
-     * @param helper
+     * @param taskContext 
+     * @param input 
+     * @param cookie 
      * @return UpdateFlow task
      */
     public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
-            final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
-        OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
-                new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>() {
-
+            OFRpcTaskContext taskContext, AddFlowInput input, 
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
+            
             @Override
-            public void run() {
-                helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult());
-                if (getResult().isDone()) {
-                    return;
-                }
-
-                // Convert the AddFlowInput to FlowModInput
-                FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
-                        getVersion(), getSession().getFeatures().getDatapathId());
-                Long xId = getSession().getNextXid();
-                ofFlowModInput.setXid(xId);
-
-                if (null != getRpcNotificationProviderService()) {
-                    FlowAddedBuilder newFlow = new FlowAddedBuilder(
-                            (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
-                    newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-                    newFlow.setFlowRef(getInput().getFlowRef());
-                    getRpcNotificationProviderService().publish(newFlow.build());
+            public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
+                ListenableFuture<RpcResult<UpdateFlowOutput>> result = SettableFuture.create();
+                
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddFlowInput to FlowModInput
+                    FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), 
+                            getVersion(), getSession().getFeatures().getDatapathId());
+                    final Long xId = getSession().getNextXid();
+                    ofFlowModInput.setXid(xId);
+                    
+                    Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
+                            getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createFlowAddedNotification(xId, getInput()));
                 }
 
-                getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput());
-                Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
-                        getMessageService().flowMod(ofFlowModInput.build(), getCookie());
-                OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+                return result;
             }
         };
+        
         return task;
     }
 
     /**
-     * @param maxTimeout
-     * @param maxTimeoutUnit
-     * @param helper
+     * @param xId
+     * @return
+     */
+    protected static NotificationComposer<FlowAdded> createFlowAddedNotification(
+            final Long xId, final AddFlowInput input) {
+        return new NotificationComposer<FlowAdded>() {
+            @Override
+            public FlowAdded compose() {
+                FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
+                newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                newFlow.setFlowRef(input.getFlowRef());
+                return newFlow.build();
+            }
+        };
+    }
+
+    /**
+     * @param taskContext 
+     * @param input 
+     * @param cookie 
      * @return UpdateFlow task
      */
     public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
-            final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
-        OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
-                new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>() {
+            OFRpcTaskContext taskContext, UpdateFlowInput input, 
+            SwitchConnectionDistinguisher cookie) {
+        
+        OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
+                ListenableFuture<RpcResult<UpdateFlowOutput>> result = null;
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
+                        getInput().getUpdatedFlow().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddFlowInput to FlowModInput
+                    FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), 
+                            getVersion(), getSession().getFeatures().getDatapathId());
+                    Long xId = getSession().getNextXid();
+                    ofFlowModInput.setXid(xId);
+    
+                    Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
+                            getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createFlowUpdatedNotification(xId, getInput()));
+                }
+                return result;
+            }
+        };
+        return task;
+    }
 
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<FlowUpdated> createFlowUpdatedNotification(
+            final Long xId, final UpdateFlowInput input) {
+        return new NotificationComposer<FlowUpdated>() {
+            @Override
+            public FlowUpdated compose() {
+                FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
+                updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                updFlow.setFlowRef(input.getFlowRef());
+                return updFlow.build();
+            }
+        };
+    }
+    
+    /**
+     * @param taskContext
+     * @param input
+     * @param cookie
+     * @return update group task
+     */
+    public static OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> createAddGroupTask(
+            final OFRpcTaskContext taskContext, AddGroupInput input, 
+            final SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                new OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
+            
             @Override
-            public void run() {
-                helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult());
-                if (getResult().isDone()) {
-                    return;
+            public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
+                ListenableFuture<RpcResult<UpdateGroupOutput>> result = SettableFuture.create();
+                
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddGroupInput to GroupModInput
+                    GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), 
+                            getVersion(), getSession().getFeatures().getDatapathId());
+                    final Long xId = getSession().getNextXid();
+                    ofGroupModInput.setXid(xId);
+                    
+                    Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = getMessageService()
+                            .groupMod(ofGroupModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createGroupAddedNotification(xId, getInput()));
                 }
 
-                // Convert the AddFlowInput to FlowModInput
-                FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(),
-                        getVersion(), getSession().getFeatures().getDatapathId());
-                Long xId = getSession().getNextXid();
-                ofFlowModInput.setXid(xId);
+                return result;
+            }
+        };
+        
+        return task;
+    }
+    
 
-                if (null != getRpcNotificationProviderService()) {
-                    FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(getInput().getUpdatedFlow());
-                    updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-                    updFlow.setFlowRef(getInput().getFlowRef());
-                    getRpcNotificationProviderService().publish(updFlow.build());
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<GroupAdded> createGroupAddedNotification(
+            final Long xId, final AddGroupInput input) {
+        return new NotificationComposer<GroupAdded>() {
+            @Override
+            public GroupAdded compose() {
+                GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input);
+                groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                groupMod.setGroupRef(input.getGroupRef());
+                return groupMod.build();
+            }
+        };
+    }
+
+    /**
+     * @param taskContext
+     * @param input
+     * @param cookie
+     * @return update meter task
+     */
+    public static OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> createAddMeterTask(
+            OFRpcTaskContext taskContext, AddMeterInput input,
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                new OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
+                ListenableFuture<RpcResult<UpdateMeterOutput>> result = SettableFuture.create();
+                
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the AddGroupInput to GroupModInput
+                    MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion());
+                    final Long xId = getSession().getNextXid();
+                    ofMeterModInput.setXid(xId);
+                    
+                    Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = getMessageService()
+                            .meterMod(ofMeterModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createMeterAddedNotification(xId, getInput()));
                 }
 
-                getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput());
-                Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
-                        getMessageService().flowMod(ofFlowModInput.build(), getCookie());
-                OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+                return result;
             }
         };
+        
         return task;
+        
+    }
+
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<MeterAdded> createMeterAddedNotification(
+            final Long xId, final AddMeterInput input) {
+        return new NotificationComposer<MeterAdded>() {
+            @Override
+            public MeterAdded compose() {
+                MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input);
+                meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                meterMod.setMeterRef(input.getMeterRef());
+                return meterMod.build();
+            }
+        };
+    }
+    
+    /**
+     * @param taskContext 
+     * @param input 
+     * @param cookie 
+     * @return UpdateFlow task
+     */
+    public static OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> createUpdateGroupTask(
+            OFRpcTaskContext taskContext, UpdateGroupInput input, 
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task = 
+                new OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
+                ListenableFuture<RpcResult<UpdateGroupOutput>> result = null;
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
+                        getInput().getUpdatedGroup().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the UpdateGroupInput to GroupModInput
+                    GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(
+                            getInput().getUpdatedGroup(), getVersion(),
+                            getSession().getFeatures().getDatapathId());
+                    final Long xId = getSession().getNextXid();
+                    ofGroupModInput.setXid(xId);
+    
+                    Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = 
+                            getMessageService().groupMod(ofGroupModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createGroupUpdatedNotification(xId, getInput()));
+                }
+                return result;
+            }
+        };
+        return task;
+    }
+    
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<GroupUpdated> createGroupUpdatedNotification(
+            final Long xId, final UpdateGroupInput input) {
+        return new NotificationComposer<GroupUpdated>() {
+            @Override
+            public GroupUpdated compose() {
+                GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
+                groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                groupMod.setGroupRef(input.getGroupRef());
+                return groupMod.build();
+            }
+        };
     }
 
+    /**
+     * @param taskContext 
+     * @param input
+     * @param cookie
+     * @return update meter task 
+     */
+    public static OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> createUpdateMeterTask(
+            OFRpcTaskContext taskContext, UpdateMeterInput input,
+            SwitchConnectionDistinguisher cookie) {
+        OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task = 
+                new OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
+            
+            @Override
+            public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
+                ListenableFuture<RpcResult<UpdateMeterOutput>> result = null;
+                Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), 
+                        getInput().getUpdatedMeter().isBarrier(), getCookie());
+                if (!barrierErrors.isEmpty()) {
+                    OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
+                } else {
+                    // Convert the UpdateMeterInput to MeterModInput
+                    MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(
+                            getInput().getUpdatedMeter(), getVersion());
+                    final Long xId = getSession().getNextXid();
+                    ofMeterModInput.setXid(xId);
+    
+                    Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = 
+                            getMessageService().meterMod(ofMeterModInput.build(), getCookie());
+                    result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                    
+                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
+                            createMeterUpdatedNotification(xId, getInput()));
+                }
+                return result;
+            }
+        };
+        return task;
+    }
+    
+    /**
+     * @param xId
+     * @param input
+     * @return
+     */
+    protected static NotificationComposer<MeterUpdated> createMeterUpdatedNotification(
+            final Long xId, final UpdateMeterInput input) {
+        return new NotificationComposer<MeterUpdated>() {
+            @Override
+            public MeterUpdated compose() {
+                MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
+                meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                meterMod.setMeterRef(input.getMeterRef());
+                return meterMod.build();
+            }
+        };
+    }
 }