Merge "additional fix fro BUG-782 unregistering switch providers"
authorMichal Rehak <mirehak@cisco.com>
Wed, 7 May 2014 22:43:47 +0000 (22:43 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 7 May 2014 22:43:47 +0000 (22:43 +0000)
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/translator/PacketInTranslator.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImplTest.java

index 156c823af62e346294a4764762ef963fbc8ccdfc..acd3814e7f192c9b1cb479794ae8fea1ff8bd6b8 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.openflow.md.core;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.opendaylight.openflowplugin.openflow.md.OFConstants;
@@ -83,6 +84,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -105,6 +108,7 @@ public class MDController implements IMDController, AutoCloseable {
     final private int OF13 = OFConstants.OFP_VERSION_1_3;
 
     private ErrorHandlerQueueImpl errorHandler;
+    private ExecutorService rpcPool;
 
 
     /**
@@ -195,6 +199,11 @@ public class MDController implements IMDController, AutoCloseable {
         // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
         OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators);
         OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners);
+        
+        // prepare worker pool for rpc
+        // TODO: get size from configSubsystem
+        OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10));
+        
     }
 
     /**
index 0ffddbe2baf6c8f234d329289d94a30320df16e4..4b36178d2038169b066d7e06c54c8c9ecd2e1ebc 100644 (file)
@@ -7,8 +7,14 @@
  */
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.openflowjava.protocol.api.util.BinContent;
@@ -29,16 +35,12 @@ import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
 import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutputBuilder;
@@ -177,12 +179,9 @@ import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Future;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
 
 /**
  * RPC implementation of MD-switch
@@ -195,81 +194,46 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     private short version = 0;
     private final SessionContext session;
     NotificationProviderService rpcNotificationProviderService;
-
-    protected ModelDrivenSwitchImpl(NodeId nodeId, InstanceIdentifier<Node> identifier, SessionContext context) {
+    private OFRpcTaskHelper rpcTaskHelper;
+    
+    // TODO:read timeout from configSubsystem
+    protected long maxTimeout = 1000;
+    protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
+    
+    protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext context) {
         super(identifier, context);
         this.nodeId = nodeId;
         messageService = sessionContext.getMessageDispatchService();
         version = context.getPrimaryConductor().getVersion();
         this.session = context;
         rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
+        rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService);
     }
 
     @Override
-    public Future<RpcResult<AddFlowOutput>> addFlow(AddFlowInput input) {
+    public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
         LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
-        Long xId = null;
-        // For Flow provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
-            xId = session.getNextXid();
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            @SuppressWarnings("unused")
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the AddFlowInput to FlowModInput
-        FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext()
-                .getFeatures().getDatapathId());
-        xId = session.getNextXid();
-        ofFlowModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            FlowAddedBuilder newFlow = new FlowAddedBuilder(
-                    (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) input);
-            newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            newFlow.setFlowRef(input.getFlowRef());
-            rpcNotificationProviderService.publish(newFlow.build());
-        }
-
-        session.getbulkTransactionCache().put(new TransactionKey(xId), input);
-        Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = messageService.flowMod(ofFlowModInput.build(), cookie);
-        RpcResult<UpdateFlowOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for AddFlow RPC" + ex.getMessage());
-        }
-
-        UpdateFlowOutput updateFlowOutput = rpcResultFromOFLib.getResult();
-
-        AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder();
-        addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId());
-        AddFlowOutput result = addFlowOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<AddFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Add Flow RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
+        rpcTaskHelper.initTask(task, input, cookie);
+        OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+        
+        return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()), 
+                OFRpcFutureResultTransformFactory.createForAddFlowOutput());
     }
 
+
     @Override
-    public Future<RpcResult<AddGroupOutput>> addGroup(AddGroupInput input) {
+    public Future<RpcResult<AddGroupOutput>> addGroup(final AddGroupInput input) {
         LOG.debug("Calling the GroupMod RPC method on MessageDispatchService");
         Long xId = null;
 
-        // For Flow provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
+        
         if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
             xId = session.getNextXid();
             BarrierInputBuilder barrierInput = new BarrierInputBuilder();
@@ -317,7 +281,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<AddMeterOutput>> addMeter(AddMeterInput input) {
+    public Future<RpcResult<AddMeterOutput>> addMeter(final AddMeterInput input) {
         LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
         Long xId = null;
         // For Meter provisioning, the SwitchConnectionDistinguisher is set to
@@ -372,7 +336,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<RemoveFlowOutput>> removeFlow(RemoveFlowInput input) {
+    public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
         LOG.debug("Calling the removeFlow RPC method on MessageDispatchService");
         Long xId = null;
         // For Flow provisioning, the SwitchConnectionDistinguisher is set to
@@ -428,7 +392,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<RemoveGroupOutput>> removeGroup(RemoveGroupInput input) {
+    public Future<RpcResult<RemoveGroupOutput>> removeGroup(final RemoveGroupInput input) {
         LOG.debug("Calling the Remove Group RPC method on MessageDispatchService");
         Long xId = null;
 
@@ -485,7 +449,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<RemoveMeterOutput>> removeMeter(RemoveMeterInput input) {
+    public Future<RpcResult<RemoveMeterOutput>> removeMeter(final RemoveMeterInput input) {
         LOG.debug("Calling the Remove MeterMod RPC method on MessageDispatchService");
         Long xId = null;
 
@@ -540,7 +504,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<Void>> transmitPacket(TransmitPacketInput input) {
+    public Future<RpcResult<Void>> transmitPacket(final TransmitPacketInput input) {
         LOG.debug("TransmitPacket - {}", input);
         // Convert TransmitPacket to PacketOutInput
         PacketOutInput message = PacketOutConvertor.toPacketOutInput(input, version, sessionContext.getNextXid(),
@@ -556,7 +520,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         return messageService.packetOut(message, cookie);
     }
 
-    private FlowModInputBuilder toFlowModInputBuilder(Flow source) {
+    private FlowModInputBuilder toFlowModInputBuilder(final Flow source) {
         FlowModInputBuilder target = new FlowModInputBuilder();
         target.setCookie(source.getCookie().getValue());
         target.setIdleTimeout(source.getIdleTimeout());
@@ -566,7 +530,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         return target;
     }
 
-    private Match toMatch(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
+    private Match toMatch(final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
         MatchBuilder target = new MatchBuilder();
 
         target.setMatchEntries(toMatchEntries(match));
@@ -575,70 +539,29 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     private List<MatchEntries> toMatchEntries(
-            org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
+            final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
         List<MatchEntries> entries = new ArrayList<>();
 
         return null;
     }
 
     @Override
-    public Future<RpcResult<UpdateFlowOutput>> updateFlow(UpdateFlowInput input) {
+    public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
         LOG.debug("Calling the updateFlow RPC method on MessageDispatchService");
-        Long xId = null;
-        // Call the RPC method on MessageDispatchService
-
-        // For Flow provisioning, the SwitchConnectionDistinguisher is set to
-        // null so
-        // the request can be routed through any connection to the switch
-
+        
+        // use primary connection
         SwitchConnectionDistinguisher cookie = null;
-        if (Objects.firstNonNull(input.getUpdatedFlow().isBarrier(), Boolean.FALSE)) {
-            BarrierInputBuilder barrierInput = new BarrierInputBuilder();
-            xId = session.getNextXid();
-            barrierInput.setVersion(version);
-            barrierInput.setXid(xId);
-            Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
-        }
-
-        // Convert the UpdateFlowInput to FlowModInput
-        FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input.getUpdatedFlow(), version, this
-                .getSessionContext().getFeatures().getDatapathId());
-        xId = session.getNextXid();
-        ofFlowModInput.setXid(xId);
-
-        if (null != rpcNotificationProviderService) {
-            FlowUpdatedBuilder updateFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
-            updateFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
-            updateFlow.setFlowRef(input.getFlowRef());
-            rpcNotificationProviderService.publish(updateFlow.build());
-        }
-
-        session.getbulkTransactionCache().put(new TransactionKey(xId), input);
-        Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = messageService.flowMod(ofFlowModInput.build(), cookie);
-
-        RpcResult<UpdateFlowOutput> rpcResultFromOFLib = null;
-
-        try {
-            rpcResultFromOFLib = resultFromOFLib.get();
-        } catch (Exception ex) {
-            LOG.error(" Error while getting result for UpdateFlow RPC" + ex.getMessage());
-        }
-
-        UpdateFlowOutput updateFlowOutputOFLib = rpcResultFromOFLib.getResult();
-
-        UpdateFlowOutputBuilder updateFlowOutput = new UpdateFlowOutputBuilder();
-        updateFlowOutput.setTransactionId(updateFlowOutputOFLib.getTransactionId());
-        UpdateFlowOutput result = updateFlowOutput.build();
-
-        Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
-        RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
-        LOG.debug("Returning the Update Flow RPC result to MD-SAL");
-        return Futures.immediateFuture(rpcResult);
+        
+        OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
+        rpcTaskHelper.initTask(task, input, cookie);
+        OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+        
+        return task.getResult();
     }
 
     @Override
-    public Future<RpcResult<UpdateGroupOutput>> updateGroup(UpdateGroupInput input) {
+    public Future<RpcResult<UpdateGroupOutput>> updateGroup(final UpdateGroupInput input) {
         LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService");
         Long xId = null;
 
@@ -694,7 +617,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<UpdateMeterOutput>> updateMeter(UpdateMeterInput input) {
+    public Future<RpcResult<UpdateMeterOutput>> updateMeter(final UpdateMeterInput input) {
         LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
         Long xId = null;
 
@@ -756,7 +679,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
      * Methods for requesting statistics from switch
      */
     @Override
-    public Future<RpcResult<GetAllGroupStatisticsOutput>> getAllGroupStatistics(GetAllGroupStatisticsInput input) {
+    public Future<RpcResult<GetAllGroupStatisticsOutput>> getAllGroupStatistics(final GetAllGroupStatisticsInput input) {
 
         GetAllGroupStatisticsOutputBuilder output = new GetAllGroupStatisticsOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -804,7 +727,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetGroupDescriptionOutput>> getGroupDescription(GetGroupDescriptionInput input) {
+    public Future<RpcResult<GetGroupDescriptionOutput>> getGroupDescription(final GetGroupDescriptionInput input) {
 
         GetGroupDescriptionOutputBuilder output = new GetGroupDescriptionOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -850,7 +773,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetGroupFeaturesOutput>> getGroupFeatures(GetGroupFeaturesInput input) {
+    public Future<RpcResult<GetGroupFeaturesOutput>> getGroupFeatures(final GetGroupFeaturesInput input) {
 
         GetGroupFeaturesOutputBuilder output = new GetGroupFeaturesOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -893,7 +816,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetGroupStatisticsOutput>> getGroupStatistics(GetGroupStatisticsInput input) {
+    public Future<RpcResult<GetGroupStatisticsOutput>> getGroupStatistics(final GetGroupStatisticsInput input) {
 
         GetGroupStatisticsOutputBuilder output = new GetGroupStatisticsOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -942,7 +865,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAllMeterConfigStatisticsOutput>> getAllMeterConfigStatistics(
-            GetAllMeterConfigStatisticsInput input) {
+            final GetAllMeterConfigStatisticsInput input) {
 
         GetAllMeterConfigStatisticsOutputBuilder output = new GetAllMeterConfigStatisticsOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -990,7 +913,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetAllMeterStatisticsOutput>> getAllMeterStatistics(GetAllMeterStatisticsInput input) {
+    public Future<RpcResult<GetAllMeterStatisticsOutput>> getAllMeterStatistics(final GetAllMeterStatisticsInput input) {
 
         GetAllMeterStatisticsOutputBuilder output = new GetAllMeterStatisticsOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -1037,7 +960,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetMeterFeaturesOutput>> getMeterFeatures(GetMeterFeaturesInput input) {
+    public Future<RpcResult<GetMeterFeaturesOutput>> getMeterFeatures(final GetMeterFeaturesInput input) {
 
         GetMeterFeaturesOutputBuilder output = new GetMeterFeaturesOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -1080,7 +1003,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetMeterStatisticsOutput>> getMeterStatistics(GetMeterStatisticsInput input) {
+    public Future<RpcResult<GetMeterStatisticsOutput>> getMeterStatistics(final GetMeterStatisticsInput input) {
 
         GetMeterStatisticsOutputBuilder output = new GetMeterStatisticsOutputBuilder();
         Collection<RpcError> errors = Collections.emptyList();
@@ -1130,7 +1053,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> getAllNodeConnectorsStatistics(
-            GetAllNodeConnectorsStatisticsInput arg0) {
+            final GetAllNodeConnectorsStatisticsInput arg0) {
 
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
@@ -1171,7 +1094,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetNodeConnectorStatisticsOutput>> getNodeConnectorStatistics(
-            GetNodeConnectorStatisticsInput arg0) {
+            final GetNodeConnectorStatisticsInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1211,7 +1134,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         return Futures.immediateFuture(rpcResult);
     }
 
-    private TransactionId generateTransactionId(Long xid) {
+    private TransactionId generateTransactionId(final Long xid) {
         String stringXid = xid.toString();
         BigInteger bigIntXid = new BigInteger(stringXid);
         return new TransactionId(bigIntXid);
@@ -1219,7 +1142,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<UpdatePortOutput>> updatePort(UpdatePortInput input) {
+    public Future<RpcResult<UpdatePortOutput>> updatePort(final UpdatePortInput input) {
         PortModInput ofPortModInput = null;
         RpcResult<UpdatePortOutput> rpcResultFromOFLib = null;
 
@@ -1277,7 +1200,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<UpdateTableOutput>> updateTable(UpdateTableInput input) {
+    public Future<RpcResult<UpdateTableOutput>> updateTable(final UpdateTableInput input) {
 
         // Get the Xid. The same Xid has to be sent in all the Multipart
         // requests
@@ -1324,7 +1247,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> getAllFlowStatisticsFromFlowTable(
-            GetAllFlowStatisticsFromFlowTableInput arg0) {
+            final GetAllFlowStatisticsFromFlowTableInput arg0) {
 
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
@@ -1370,7 +1293,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> getAllFlowsStatisticsFromAllFlowTables(
-            GetAllFlowsStatisticsFromAllFlowTablesInput arg0) {
+            final GetAllFlowsStatisticsFromAllFlowTablesInput arg0) {
 
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
@@ -1418,7 +1341,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> getFlowStatisticsFromFlowTable(
-            GetFlowStatisticsFromFlowTableInput arg0) {
+            final GetFlowStatisticsFromFlowTableInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1481,7 +1404,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> getAggregateFlowStatisticsFromFlowTableForAllFlows(
-            GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput arg0) {
+            final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1527,7 +1450,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>> getAggregateFlowStatisticsFromFlowTableForGivenMatch(
-            GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput arg0) {
+            final GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput arg0) {
 
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
@@ -1580,7 +1503,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
     }
 
     @Override
-    public Future<RpcResult<GetFlowTablesStatisticsOutput>> getFlowTablesStatistics(GetFlowTablesStatisticsInput arg0) {
+    public Future<RpcResult<GetFlowTablesStatisticsOutput>> getFlowTablesStatistics(final GetFlowTablesStatisticsInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1618,7 +1541,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> getAllQueuesStatisticsFromAllPorts(
-            GetAllQueuesStatisticsFromAllPortsInput arg0) {
+            final GetAllQueuesStatisticsFromAllPortsInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1663,7 +1586,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetAllQueuesStatisticsFromGivenPortOutput>> getAllQueuesStatisticsFromGivenPort(
-            GetAllQueuesStatisticsFromGivenPortInput arg0) {
+            final GetAllQueuesStatisticsFromGivenPortInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1709,7 +1632,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
 
     @Override
     public Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> getQueueStatisticsFromGivenPort(
-            GetQueueStatisticsFromGivenPortInput arg0) {
+            final GetQueueStatisticsFromGivenPortInput arg0) {
         // Generate xid to associate it with the request
         Long xid = this.getSessionContext().getNextXid();
 
@@ -1751,5 +1674,21 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         RpcResult<GetQueueStatisticsFromGivenPortOutput> rpcResult = Rpcs.getRpcResult(true, output.build(), errors);
         return Futures.immediateFuture(rpcResult);
     }
+    
+    /**
+     * @param input
+     * @param cookie
+     * @param session
+     * @param messageService 
+     * @return barrier result
+     */
+    public static Future<RpcResult<BarrierOutput>> sendBarrier(
+            SwitchConnectionDistinguisher cookie, SessionContext session, 
+            IMessageDispatchService messageService) {
+        BarrierInputBuilder barrierInput = new BarrierInputBuilder();
+        barrierInput.setVersion(session.getFeatures().getVersion());
+        barrierInput.setXid(session.getNextXid());
+        return messageService.barrier(barrierInput.build(), cookie);
+    }
 
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java
new file mode 100644 (file)
index 0000000..a0bb177
--- /dev/null
@@ -0,0 +1,65 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.util.Collection;
+
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+
+/**
+ * collection of transformation functions dedicated to rpc future results  
+ */
+public abstract class OFRpcFutureResultTransformFactory {
+    
+    protected static Logger LOG = LoggerFactory
+            .getLogger(OFRpcFutureResultTransformFactory.class);
+
+    /**
+     * @return translator from {@link UpdateFlowOutput} to {@link AddFlowOutput}
+     */
+    public static Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>> createForAddFlowOutput() {
+        return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>() {
+
+            @Override
+            public RpcResult<AddFlowOutput> apply(final RpcResult<UpdateFlowOutput> input) {
+
+                UpdateFlowOutput updateFlowOutput = input.getResult();
+
+                AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder();
+                addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId());
+                AddFlowOutput result = addFlowOutput.build();
+
+                RpcResult<AddFlowOutput> rpcResult = assembleRpcResult(input, result);
+                LOG.debug("Returning the Add Flow RPC result to MD-SAL");
+                return rpcResult;
+            }
+
+        };
+    }
+    
+    
+    /**
+     * @param input
+     * @param result
+     * @return
+     */
+    protected static <E> RpcResult<E> assembleRpcResult(RpcResult<?> input, E result) {
+        Collection<RpcError> errors = input.getErrors();
+        RpcResult<E> rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors);
+        return rpcResult;
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java
new file mode 100644 (file)
index 0000000..b153fcc
--- /dev/null
@@ -0,0 +1,121 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * @param <T> input type
+ * @param <K> future output type
+ */
+public abstract class OFRpcTask<T, K> implements Runnable {
+    
+    private SwitchConnectionDistinguisher cookie;
+    private IMessageDispatchService messageService;
+    private SessionContext session;
+    private T input;
+    private SettableFuture<K> result;
+    private NotificationProviderService rpcNotificationProviderService;
+    
+    /**
+     * @return the result
+     */
+    public SettableFuture<K> getResult() {
+        return result;
+    }
+    
+    /**
+     * @param result the result to set
+     */
+    public void setResult(SettableFuture<K> result) {
+        this.result = result;
+    }
+
+    /**
+     * @return the cookie
+     */
+    public SwitchConnectionDistinguisher getCookie() {
+        return cookie;
+    }
+
+    /**
+     * @return the messageService
+     */
+    public IMessageDispatchService getMessageService() {
+        return messageService;
+    }
+
+    /**
+     * @return the session
+     */
+    public SessionContext getSession() {
+        return session;
+    }
+    
+    /**
+     * @return protocol version
+     */
+    public Short getVersion() {
+        return session.getFeatures().getVersion();
+    }
+
+    /**
+     * @param cookie the cookie to set
+     */
+    public void setCookie(SwitchConnectionDistinguisher cookie) {
+        this.cookie = cookie;
+    }
+
+    /**
+     * @param messageService the messageService to set
+     */
+    public void setMessageService(IMessageDispatchService messageService) {
+        this.messageService = messageService;
+    }
+
+    /**
+     * @param session the session to set
+     */
+    public void setSession(SessionContext session) {
+        this.session = session;
+    }
+
+    /**
+     * @return the input
+     */
+    public T getInput() {
+        return input;
+    }
+
+    /**
+     * @param input the input to set
+     */
+    public void setInput(T input) {
+        this.input = input;
+    }
+
+    /**
+     * @param rpcNotificationProviderService
+     */
+    public void setRpcNotificationProviderService(
+            NotificationProviderService rpcNotificationProviderService) {
+                this.rpcNotificationProviderService = rpcNotificationProviderService;
+    }
+    
+    /**
+     * @return the rpcNotificationProviderService
+     */
+    public NotificationProviderService getRpcNotificationProviderService() {
+        return rpcNotificationProviderService;
+    }
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java
new file mode 100644 (file)
index 0000000..5bb1119
--- /dev/null
@@ -0,0 +1,111 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.math.BigInteger;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ * 
+ */
+public abstract class OFRpcTaskFactory {
+
+    /**
+     * @param maxTimeout 
+     * @param maxTimeoutUnit 
+     * @param helper 
+     * @return UpdateFlow task
+     */
+    public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
+            final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
+        OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>() {
+            
+            @Override
+            public void run() {
+                helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult());
+                if (getResult().isDone()) {
+                    return;
+                }
+
+                // Convert the AddFlowInput to FlowModInput
+                FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), 
+                        getVersion(), getSession().getFeatures().getDatapathId());
+                Long xId = getSession().getNextXid();
+                ofFlowModInput.setXid(xId);
+
+                if (null != getRpcNotificationProviderService()) {
+                    FlowAddedBuilder newFlow = new FlowAddedBuilder(
+                            (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
+                    newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                    newFlow.setFlowRef(getInput().getFlowRef());
+                    getRpcNotificationProviderService().publish(newFlow.build());
+                }
+
+                getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput());
+                Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
+                        getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+                OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+            }
+        };
+        return task;
+    }
+    
+    /**
+     * @param maxTimeout 
+     * @param maxTimeoutUnit 
+     * @param helper 
+     * @return UpdateFlow task
+     */
+    public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
+            final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
+        OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task = 
+                new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>() {
+            
+            @Override
+            public void run() {
+                helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult());
+                if (getResult().isDone()) {
+                    return;
+                }
+
+                // Convert the AddFlowInput to FlowModInput
+                FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), 
+                        getVersion(), getSession().getFeatures().getDatapathId());
+                Long xId = getSession().getNextXid();
+                ofFlowModInput.setXid(xId);
+
+                if (null != getRpcNotificationProviderService()) {
+                    FlowAddedBuilder newFlow = new FlowAddedBuilder(
+                            (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
+                    newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+                    newFlow.setFlowRef(getInput().getFlowRef());
+                    getRpcNotificationProviderService().publish(newFlow.build());
+                }
+
+                getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput());
+                Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = 
+                        getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+                OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+            }
+        };
+        return task;
+    }
+    
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java
new file mode 100644 (file)
index 0000000..d844a65
--- /dev/null
@@ -0,0 +1,109 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * 
+ */
+public class OFRpcTaskHelper {
+
+    private IMessageDispatchService messageService;
+    private SessionContext session;
+    private NotificationProviderService rpcNotificationProviderService;
+    /**
+     * @param cookie
+     * @param messageService
+     * @param session
+     * @param rpcNotificationProviderService 
+     */
+    public OFRpcTaskHelper(IMessageDispatchService messageService, SessionContext session, 
+            NotificationProviderService rpcNotificationProviderService) {
+        this.messageService = messageService;
+        this.session = session;
+        this.rpcNotificationProviderService = rpcNotificationProviderService;
+    }
+    
+    
+    /**
+     * @param task
+     * @param input 
+     * @param cookie 
+     * @return inited task
+     */
+    public <T, K> OFRpcTask<T, K> initTask(OFRpcTask<T, K> task, T input, SwitchConnectionDistinguisher cookie) {
+        task.setMessageService(messageService);
+        task.setSession(session);
+        task.setRpcNotificationProviderService(rpcNotificationProviderService);
+        task.setResult(SettableFuture.<K>create());
+        task.setCookie(cookie);
+        task.setInput(input);
+        return task;
+    }
+    
+    /**
+     * @param intern 
+     * @param wrapper 
+     */
+    public static <K> void chainFutures(final Future<K> intern, final SettableFuture<K> wrapper) {
+        Futures.addCallback(
+                JdkFutureAdapters.listenInPoolThread(intern),
+                new FutureCallback<K>() {
+
+                    @Override
+                    public void onSuccess(
+                            K result) {
+                        wrapper.set(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        wrapper.setException(t);
+                    }
+
+                });
+    }
+    
+    /**
+     * @param maxTimeout
+     * @param maxTimeoutUnit
+     * @param isBarrier 
+     * @param cookie 
+     * @param result 
+     */
+    public <T> void rawBarrierSend(final long maxTimeout, final TimeUnit maxTimeoutUnit, 
+            Boolean isBarrier, SwitchConnectionDistinguisher cookie, SettableFuture<RpcResult<T>> result) {
+        if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
+            Future<RpcResult<BarrierOutput>> barrierFuture = ModelDrivenSwitchImpl.sendBarrier(cookie, session, messageService);
+            try {
+                RpcResult<BarrierOutput> barrierResult = barrierFuture.get(maxTimeout, maxTimeoutUnit);
+                if (!barrierResult.isSuccessful()) {
+                    result.set(Rpcs.<T>getRpcResult(false, barrierResult.getErrors()));
+                }
+            } catch (Exception e) {
+                result.setException(e);
+            }
+        }
+    }
+}
index 41b4fae9068175f0b1809a674eb6e85a22f54658..e6f3df76446a6bba2bf9f51408a051aedb51191a 100644 (file)
@@ -170,7 +170,6 @@ public abstract class OFSessionUtil {
      * @return pop listener Map
      */
     public static Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> getPopListenerMapping() {
-        // TODO Auto-generated method stub
         return getSessionManager().getPopListenerMapping();
     }
 
index fd3bab743821d069460ef1fa78b7db1af7106d23..19c9180c8e5b806629dbff3f3f4f051b3483edce 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.openflow.md.core.session;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
@@ -111,4 +112,14 @@ public interface SessionManager extends AutoCloseable {
      * @param popListenerMapping the popListenerMapping to set
      */
     void setPopListenerMapping(Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping);
+
+    /**
+     * @param newFixedThreadPool
+     */
+    void setRpcPool(ExecutorService newFixedThreadPool);
+
+    /**
+     * @return the rpcPool instance
+     */
+    ExecutorService getRpcPool();
 }
index 9f4429dbc3521d58f2427f8f305f3060cf34a911..0af916d6dcb45c06465816c1fbdaafbe735dd600 100644 (file)
@@ -14,6 +14,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
@@ -195,6 +196,7 @@ public class SessionManagerOFImpl implements SessionManager {
             }
         }
     };
+    private ExecutorService rpcPool;
 
 
     @Override
@@ -244,6 +246,18 @@ public class SessionManagerOFImpl implements SessionManager {
             for (SessionContext sessionContext : sessionLot.values()) {
                 sessionContext.getPrimaryConductor().disconnect();
             }
+            // TODO: handle timeouted shutdown
+            rpcPool.shutdown();
         }
     }
+
+    @Override
+    public void setRpcPool(ExecutorService rpcPool) {
+        this.rpcPool = rpcPool;
+    }
+    
+    @Override
+    public ExecutorService getRpcPool() {
+        return rpcPool;
+    }
 }
index 245ecb5b095609d239b46b9d37c5f249a504a718..f547040998b87950fb809de56d444f49734374a2 100644 (file)
@@ -54,6 +54,8 @@ public class PacketInTranslator implements IMDMessageTranslator<OfHeader, List<D
            // create a packet received event builder
            PacketReceivedBuilder pktInBuilder = new PacketReceivedBuilder();
            pktInBuilder.setPayload(message.getData());
+           //TODO: add connection cookie
+           //pktInBuilder.setConnectionCookie(new ConnectionCookie(cookie.getId()));
 
            // get the DPID
            GetFeaturesOutput features = sc.getFeatures();
index 92cfe5ec40c04d2c95e8182cdb53c86252322065..1494186f2dba2d962509d27cd5c3d012e6992088 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.openflowplugin.openflow.md.core.sal;
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Before;
@@ -26,6 +27,7 @@ import org.opendaylight.openflowplugin.openflow.md.OFConstants;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
@@ -77,6 +79,8 @@ public class ModelDrivenSwitchImplTest {
         Mockito.when(context.getFeatures()).thenReturn(features);
         Mockito.when(context.getbulkTransactionCache()).thenReturn(bulkTransactionCache);
         Mockito.when(features.getDatapathId()).thenReturn(BigInteger.valueOf(1));
+        
+        OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10));
 
         mdSwitchOF10 = new ModelDrivenSwitchImpl(null, null, context);
         mdSwitchOF13 = new ModelDrivenSwitchImpl(null, null, context);