From 19199462c5e6e100d03c6a70081d673df3b00a59 Mon Sep 17 00:00:00 2001 From: Michal Rehak Date: Thu, 8 May 2014 22:09:43 +0200 Subject: [PATCH] BUG-956 deadlock by rpc invocation - phase2 - tuning Future chaining - introduced ListeningExecutorService - dereference used - clean up after bulkTransactionCache removal - add/update meter and group refactored in order to follow future responses and run in pool Change-Id: I5c9367512c3ab6f1799f30dad1344667fde63991 Signed-off-by: Michal Rehak --- .../openflow/md/OFConstants.java | 5 + .../openflow/md/core/MDController.java | 38 +- .../openflow/md/core/MessageFactory.java | 30 ++ .../md/core/sal/ModelDrivenSwitchImpl.java | 294 +++---------- .../md/core/sal/NotificationComposer.java | 22 + .../OFRpcFutureResultTransformFactory.java | 67 ++- .../openflow/md/core/sal/OFRpcTask.java | 111 ++--- .../md/core/sal/OFRpcTaskContext.java | 104 +++++ .../md/core/sal/OFRpcTaskFactory.java | 403 +++++++++++++++--- .../openflow/md/core/sal/OFRpcTaskHelper.java | 109 ----- .../openflow/md/core/sal/OFRpcTaskUtil.java | 121 ++++++ .../session/MessageDispatchServiceImpl.java | 144 ++++--- .../md/core/session/SessionContextOFImpl.java | 5 - .../md/core/session/SessionManager.java | 7 +- .../md/core/session/SessionManagerOFImpl.java | 13 +- .../md/core/session/TransactionKey.java | 56 --- .../openflow/md/util/FlowCreatorUtil.java | 2 +- .../md/util/InventoryDataServiceUtil.java | 2 +- .../openflow/md/util/PortTranslatorUtil.java | 2 +- .../core/sal/ModelDrivenSwitchImplTest.java | 9 +- .../md/core/sal/SwitchFeaturesUtilTest.java | 2 +- 21 files changed, 932 insertions(+), 614 deletions(-) create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java delete mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java create mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java delete mode 100644 openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/OFConstants.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/OFConstants.java index f125390447..bbc2786e5a 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/OFConstants.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/OFConstants.java @@ -39,4 +39,9 @@ public class OFConstants { public static final int MAC_ADDRESS_LENGTH = 6; public static final int SIZE_OF_LONG_IN_BYTES = 8; public static final int SIGNUM_UNSIGNED = 1; + + /** RpcError application tag */ + public static final String APPLICATION_TAG = "OPENFLOW_PLUGIN"; + /** RpcError tag - timeout */ + public static final String ERROR_TAG_TIMEOUT = "TIMOUT"; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 5e0a6a923b..147bb60ed6 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -8,9 +8,18 @@ package org.opendaylight.openflowplugin.openflow.md.core; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider; @@ -76,22 +85,12 @@ import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; /** - * @author mirehak * */ public class MDController implements IMDController, AutoCloseable { @@ -108,8 +107,6 @@ public class MDController implements IMDController, AutoCloseable { final private int OF13 = OFConstants.OFP_VERSION_1_3; private ErrorHandlerSimpleImpl errorHandler; - private ExecutorService rpcPool; - /** * @return translator mapping @@ -202,7 +199,8 @@ public class MDController implements IMDController, AutoCloseable { // prepare worker pool for rpc // TODO: get size from configSubsystem - OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10)); + OFSessionUtil.getSessionManager().setRpcPool( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10))); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java index 6266e22ef1..c876917f28 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java @@ -11,6 +11,8 @@ import java.util.ArrayList; import java.util.List; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements; @@ -93,4 +95,32 @@ public abstract class MessageFactory { } return result; } + + /** + * @param ofVersion + * @param ofXid + * @return barrier message + */ + public static BarrierInput createBarrier(short ofVersion, long ofXid) { + BarrierInputBuilder barrierInput = new BarrierInputBuilder(); + barrierInput.setVersion(ofVersion); + barrierInput.setXid(ofXid); + return barrierInput.build(); + } + +// /** +// * @param input +// * @param cookie +// * @param session +// * @param messageService +// * @return barrier result +// */ +// public static Future> sendBarrier( +// SwitchConnectionDistinguisher cookie, SessionContext session, +// IMessageDispatchService messageService) { +// BarrierInputBuilder barrierInput = new BarrierInputBuilder(); +// barrierInput.setVersion(session.getFeatures().getVersion()); +// barrierInput.setXid(session.getNextXid()); +// return messageService.barrier(barrierInput.build(), cookie); +// } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java index 4b57aac794..de5883e445 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java @@ -30,7 +30,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.match.Matc import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; -import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey; import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil; import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; @@ -64,8 +63,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.p import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput; @@ -90,8 +87,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput; @@ -182,6 +177,7 @@ import org.slf4j.Logger; import com.google.common.base.Objects; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; /** * RPC implementation of MD-switch @@ -192,22 +188,28 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { private final NodeId nodeId; private final IMessageDispatchService messageService; private short version = 0; - private final SessionContext session; - NotificationProviderService rpcNotificationProviderService; - private OFRpcTaskHelper rpcTaskHelper; + private NotificationProviderService rpcNotificationProviderService; + private OFRpcTaskContext rpcTaskContext; // TODO:read timeout from configSubsystem protected long maxTimeout = 1000; protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS; - protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier identifier, final SessionContext context) { - super(identifier, context); + protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier identifier, + final SessionContext sessionContext) { + super(identifier, sessionContext); this.nodeId = nodeId; messageService = sessionContext.getMessageDispatchService(); - version = context.getPrimaryConductor().getVersion(); - this.session = context; + version = sessionContext.getPrimaryConductor().getVersion(); rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService(); - rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService); + + rpcTaskContext = new OFRpcTaskContext(); + rpcTaskContext.setSession(sessionContext); + rpcTaskContext.setMessageService(messageService); + rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService); + rpcTaskContext.setMaxTimeout(maxTimeout); + rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit); + rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool()); } @Override @@ -217,11 +219,10 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { SwitchConnectionDistinguisher cookie = null; OFRpcTask> task = - OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper); - rpcTaskHelper.initTask(task, input, cookie); - OFSessionUtil.getSessionManager().getRpcPool().submit(task); + OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie); + ListenableFuture> result = task.submit(); - return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()), + return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), OFRpcFutureResultTransformFactory.createForAddFlowOutput()); } @@ -229,108 +230,31 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { @Override public Future> addGroup(final AddGroupInput input) { LOG.debug("Calling the GroupMod RPC method on MessageDispatchService"); - Long xId = null; - + // use primary connection SwitchConnectionDistinguisher cookie = null; - if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - barrierInput.setVersion(version); - barrierInput.setXid(xId); - @SuppressWarnings("unused") - Future> barrierOFLib = messageService.barrier(barrierInput.build(), cookie); - } - - // Convert the AddGroupInput to GroupModInput - GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext() - .getFeatures().getDatapathId()); - xId = session.getNextXid(); - ofGroupModInput.setXid(xId); - - if (null != rpcNotificationProviderService) { - GroupAddedBuilder groupMod = new GroupAddedBuilder( - (org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group) input); - groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - groupMod.setGroupRef(input.getGroupRef()); - rpcNotificationProviderService.publish(groupMod.build()); - } - - Future> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie); - RpcResult rpcResultFromOFLib = null; - - try { - rpcResultFromOFLib = resultFromOFLib.get(); - } catch (Exception ex) { - LOG.error(" Error while getting result for AddGroup RPC" + ex.getMessage()); - } - - UpdateGroupOutput updateGroupOutput = rpcResultFromOFLib.getResult(); - - AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder(); - addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId()); - AddGroupOutput result = addGroupOutput.build(); - - Collection errors = rpcResultFromOFLib.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning the Add Group RPC result to MD-SAL"); - return Futures.immediateFuture(rpcResult); + OFRpcTask> task = + OFRpcTaskFactory.createAddGroupTask(rpcTaskContext, input, cookie); + ListenableFuture> result = task.submit(); + + return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), + OFRpcFutureResultTransformFactory.createForAddGroupOutput()); } @Override public Future> addMeter(final AddMeterInput input) { LOG.debug("Calling the MeterMod RPC method on MessageDispatchService"); - Long xId = null; - // For Meter provisioning, the SwitchConnectionDistinguisher is set to - // null so - // the request can be routed through any connection to the switch - + + // use primary connection SwitchConnectionDistinguisher cookie = null; - if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - barrierInput.setVersion(version); - barrierInput.setXid(xId); - @SuppressWarnings("unused") - Future> barrierOFLib = messageService.barrier(barrierInput.build(), cookie); - } - - // Convert the AddMeterInput to MeterModInput - MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version); - xId = session.getNextXid(); - ofMeterModInput.setXid(xId); - - if (null != rpcNotificationProviderService) { - MeterAddedBuilder meterMod = new MeterAddedBuilder( - (org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter) input); - meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - meterMod.setMeterRef(input.getMeterRef()); - rpcNotificationProviderService.publish(meterMod.build()); - } - - Future> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie); - - RpcResult rpcResultFromOFLib = null; - - try { - rpcResultFromOFLib = resultFromOFLib.get(); - } catch (Exception ex) { - LOG.error(" Error while getting result for AddMeter RPC" + ex.getMessage()); - } - - UpdateMeterOutput updateMeterOutput = rpcResultFromOFLib.getResult(); - - AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder(); - addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId()); - AddMeterOutput result = addMeterOutput.build(); - - Collection errors = rpcResultFromOFLib.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning the Add Meter RPC result to MD-SAL"); - return Futures.immediateFuture(rpcResult); + + OFRpcTask> task = + OFRpcTaskFactory.createAddMeterTask(rpcTaskContext, input, cookie); + ListenableFuture> result = task.submit(); + + return Futures.transform(JdkFutureAdapters.listenInPoolThread(result), + OFRpcFutureResultTransformFactory.createForAddMeterOutput()); } @Override @@ -344,7 +268,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { SwitchConnectionDistinguisher cookie = null; if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - xId = session.getNextXid(); + xId = sessionContext.getNextXid(); barrierInput.setXid(xId); barrierInput.setVersion(version); @SuppressWarnings("unused") @@ -352,9 +276,9 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { } // Convert the RemoveFlowInput to FlowModInput - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext() + FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, sessionContext .getFeatures().getDatapathId()); - xId = session.getNextXid(); + xId = sessionContext.getNextXid(); ofFlowModInput.setXid(xId); if (null != rpcNotificationProviderService) { @@ -399,7 +323,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { SwitchConnectionDistinguisher cookie = null; if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); + xId = sessionContext.getNextXid(); BarrierInputBuilder barrierInput = new BarrierInputBuilder(); barrierInput.setVersion(version); barrierInput.setXid(xId); @@ -410,7 +334,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { // Convert the RemoveGroupInput to GroupModInput GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext() .getFeatures().getDatapathId()); - xId = session.getNextXid(); + xId = sessionContext.getNextXid(); ofGroupModInput.setXid(xId); if (null != rpcNotificationProviderService) { @@ -454,7 +378,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { // the request can be routed through any connection to the switch SwitchConnectionDistinguisher cookie = null; if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); + xId = sessionContext.getNextXid(); BarrierInputBuilder barrierInput = new BarrierInputBuilder(); barrierInput.setVersion(version); barrierInput.setXid(xId); @@ -464,7 +388,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { // Convert the RemoveMeterInput to MeterModInput MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version); - xId = session.getNextXid(); + xId = sessionContext.getNextXid(); ofMeterModInput.setXid(xId); if (null != rpcNotificationProviderService) { @@ -548,119 +472,38 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { SwitchConnectionDistinguisher cookie = null; OFRpcTask> task = - OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper); - rpcTaskHelper.initTask(task, input, cookie); - OFSessionUtil.getSessionManager().getRpcPool().submit(task); + OFRpcTaskFactory.createUpdateFlowTask(rpcTaskContext, input, cookie); + ListenableFuture> result = task.submit(); - return task.getResult(); + return result; } @Override public Future> updateGroup(final UpdateGroupInput input) { LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService"); - Long xId = null; - - // For Flow provisioning, the SwitchConnectionDistinguisher is set to - // null so - // the request can be routed through any connection to the switch - + + // use primary connection SwitchConnectionDistinguisher cookie = null; - if (Objects.firstNonNull(input.getUpdatedGroup().isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - barrierInput.setVersion(version); - barrierInput.setXid(xId); - @SuppressWarnings("unused") - Future> barrierOFLib = messageService.barrier(barrierInput.build(), cookie); - } - - // Convert the UpdateGroupInput to GroupModInput - GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input.getUpdatedGroup(), version, this - .getSessionContext().getFeatures().getDatapathId()); - xId = session.getNextXid(); - ofGroupModInput.setXid(xId); - - if (null != rpcNotificationProviderService) { - GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup()); - groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - groupMod.setGroupRef(input.getGroupRef()); - rpcNotificationProviderService.publish(groupMod.build()); - } - - Future> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie); - - RpcResult rpcResultFromOFLib = null; - - try { - rpcResultFromOFLib = resultFromOFLib.get(); - } catch (Exception ex) { - LOG.error(" Error while getting result for updateGroup RPC" + ex.getMessage()); - } - - UpdateGroupOutput updateGroupOutputOFLib = rpcResultFromOFLib.getResult(); - - UpdateGroupOutputBuilder updateGroupOutput = new UpdateGroupOutputBuilder(); - updateGroupOutput.setTransactionId(updateGroupOutputOFLib.getTransactionId()); - UpdateGroupOutput result = updateGroupOutput.build(); - - Collection errors = rpcResultFromOFLib.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning the Update Group RPC result to MD-SAL"); - return Futures.immediateFuture(rpcResult); + + OFRpcTask> task = + OFRpcTaskFactory.createUpdateGroupTask(rpcTaskContext, input, cookie); + ListenableFuture> result = task.submit(); + + return result; } @Override public Future> updateMeter(final UpdateMeterInput input) { LOG.debug("Calling the MeterMod RPC method on MessageDispatchService"); - Long xId = null; - - // For Meter provisioning, the SwitchConnectionDistinguisher is set to - // null so - // the request can be routed through any connection to the switch + + // use primary connection SwitchConnectionDistinguisher cookie = null; - if (Objects.firstNonNull(input.getUpdatedMeter().isBarrier(), Boolean.FALSE)) { - xId = session.getNextXid(); - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - barrierInput.setVersion(version); - barrierInput.setXid(xId); - @SuppressWarnings("unused") - Future> barrierOFLib = messageService.barrier(barrierInput.build(), cookie); - } - - // Convert the UpdateMeterInput to MeterModInput - MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input.getUpdatedMeter(), version); - xId = session.getNextXid(); - ofMeterModInput.setXid(xId); - - if (null != rpcNotificationProviderService) { - MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter()); - meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - meterMod.setMeterRef(input.getMeterRef()); - rpcNotificationProviderService.publish(meterMod.build()); - } - - Future> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie); - - RpcResult rpcResultFromOFLib = null; - - try { - rpcResultFromOFLib = resultFromOFLib.get(); - } catch (Exception ex) { - LOG.error(" Error while getting result for UpdateMeter RPC" + ex.getMessage()); - } - - UpdateMeterOutput updateMeterOutputFromOFLib = rpcResultFromOFLib.getResult(); - - UpdateMeterOutputBuilder updateMeterOutput = new UpdateMeterOutputBuilder(); - updateMeterOutput.setTransactionId(updateMeterOutputFromOFLib.getTransactionId()); - UpdateMeterOutput result = updateMeterOutput.build(); - - Collection errors = rpcResultFromOFLib.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning the Update Meter RPC result to MD-SAL"); - return Futures.immediateFuture(rpcResult); + + OFRpcTask> task = + OFRpcTaskFactory.createUpdateMeterTask(rpcTaskContext, input, cookie); + ListenableFuture> result = task.submit(); + + return result; } @Override @@ -1667,21 +1510,4 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { RpcResult rpcResult = Rpcs.getRpcResult(true, output.build(), errors); return Futures.immediateFuture(rpcResult); } - - /** - * @param input - * @param cookie - * @param session - * @param messageService - * @return barrier result - */ - public static Future> sendBarrier( - SwitchConnectionDistinguisher cookie, SessionContext session, - IMessageDispatchService messageService) { - BarrierInputBuilder barrierInput = new BarrierInputBuilder(); - barrierInput.setVersion(session.getFeatures().getVersion()); - barrierInput.setXid(session.getNextXid()); - return messageService.barrier(barrierInput.build(), cookie); - } - } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java new file mode 100644 index 0000000000..08e961a792 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/NotificationComposer.java @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import org.opendaylight.yangtools.yang.binding.Notification; + +/** + * @param type of notification + * + */ +public interface NotificationComposer { + + /** + * @return notification instance + */ + N compose(); +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java index a0bb177b25..b2c1a42817 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcFutureResultTransformFactory.java @@ -13,6 +13,12 @@ import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; @@ -28,6 +34,17 @@ public abstract class OFRpcFutureResultTransformFactory { protected static Logger LOG = LoggerFactory .getLogger(OFRpcFutureResultTransformFactory.class); + /** + * @param input + * @param result + * @return + */ + protected static RpcResult assembleRpcResult(RpcResult input, E result) { + Collection errors = input.getErrors(); + RpcResult rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors); + return rpcResult; + } + /** * @return translator from {@link UpdateFlowOutput} to {@link AddFlowOutput} */ @@ -35,7 +52,7 @@ public abstract class OFRpcFutureResultTransformFactory { return new Function,RpcResult>() { @Override - public RpcResult apply(final RpcResult input) { + public RpcResult apply(RpcResult input) { UpdateFlowOutput updateFlowOutput = input.getResult(); @@ -51,15 +68,49 @@ public abstract class OFRpcFutureResultTransformFactory { }; } + /** + * @return translator from {@link UpdateGroupOutput} to {@link AddGroupOutput} + */ + public static Function, RpcResult> createForAddGroupOutput() { + return new Function,RpcResult>() { + + @Override + public RpcResult apply(RpcResult input) { + UpdateGroupOutput updateGroupOutput = input.getResult(); + + AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder(); + addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId()); + AddGroupOutput result = addGroupOutput.build(); + + RpcResult rpcResult = assembleRpcResult(input, result); + LOG.debug("Returning the Add Group RPC result to MD-SAL"); + return rpcResult; + } + }; + } /** - * @param input - * @param result - * @return + * @return translator from {@link UpdateGroupOutput} to {@link AddGroupOutput} */ - protected static RpcResult assembleRpcResult(RpcResult input, E result) { - Collection errors = input.getErrors(); - RpcResult rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors); - return rpcResult; + public static Function, RpcResult> createForAddMeterOutput() { + return new Function,RpcResult>() { + + @Override + public RpcResult apply(final RpcResult input) { + UpdateMeterOutput updateMeterOutput = input.getResult(); + + AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder(); + addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId()); + AddMeterOutput result = addMeterOutput.build(); + + RpcResult rpcResult = assembleRpcResult(input, result); + LOG.debug("Returning the Add Meter RPC result to MD-SAL"); + return rpcResult; + } + }; } + + + + } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java index b153fcc420..79764f9dd0 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTask.java @@ -7,38 +7,36 @@ */ package org.opendaylight.openflowplugin.openflow.md.core.sal; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; /** * @param input type * @param future output type */ -public abstract class OFRpcTask implements Runnable { +public abstract class OFRpcTask implements Callable> { - private SwitchConnectionDistinguisher cookie; - private IMessageDispatchService messageService; - private SessionContext session; + private OFRpcTaskContext taskContext; private T input; - private SettableFuture result; - private NotificationProviderService rpcNotificationProviderService; - - /** - * @return the result - */ - public SettableFuture getResult() { - return result; - } + private SwitchConnectionDistinguisher cookie; /** - * @param result the result to set + * @param taskContext + * @param input + * @param cookie */ - public void setResult(SettableFuture result) { - this.result = result; + public OFRpcTask(OFRpcTaskContext taskContext, SwitchConnectionDistinguisher cookie, T input) { + this.taskContext = taskContext; + this.cookie = cookie; + this.input = input; } /** @@ -49,73 +47,86 @@ public abstract class OFRpcTask implements Runnable { } /** - * @return the messageService + * @param cookie the cookie to set */ - public IMessageDispatchService getMessageService() { - return messageService; + public void setCookie(SwitchConnectionDistinguisher cookie) { + this.cookie = cookie; } /** - * @return the session + * @return the input */ - public SessionContext getSession() { - return session; + public T getInput() { + return input; } - + /** - * @return protocol version + * @param input the input to set */ - public Short getVersion() { - return session.getFeatures().getVersion(); + public void setInput(T input) { + this.input = input; } /** - * @param cookie the cookie to set + * @return the rpcNotificationProviderService */ - public void setCookie(SwitchConnectionDistinguisher cookie) { - this.cookie = cookie; + public NotificationProviderService getRpcNotificationProviderService() { + return taskContext.getRpcNotificationProviderService(); } /** - * @param messageService the messageService to set + * @return message service + * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMessageService() */ - public void setMessageService(IMessageDispatchService messageService) { - this.messageService = messageService; + public IMessageDispatchService getMessageService() { + return taskContext.getMessageService(); } /** - * @param session the session to set + * @return session + * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getSession() */ - public void setSession(SessionContext session) { - this.session = session; + public SessionContext getSession() { + return taskContext.getSession(); } /** - * @return the input + * @return max timeout + * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMaxTimeout() */ - public T getInput() { - return input; + public long getMaxTimeout() { + return taskContext.getMaxTimeout(); } /** - * @param input the input to set + * @return time unit for max timeout + * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMaxTimeoutUnit() */ - public void setInput(T input) { - this.input = input; + public TimeUnit getMaxTimeoutUnit() { + return taskContext.getMaxTimeoutUnit(); } - + /** - * @param rpcNotificationProviderService + * @return protocol version */ - public void setRpcNotificationProviderService( - NotificationProviderService rpcNotificationProviderService) { - this.rpcNotificationProviderService = rpcNotificationProviderService; + public Short getVersion() { + return taskContext.getSession().getFeatures().getVersion(); + } /** - * @return the rpcNotificationProviderService + * @return the taskContext */ - public NotificationProviderService getRpcNotificationProviderService() { - return rpcNotificationProviderService; + public OFRpcTaskContext getTaskContext() { + return taskContext; + } + + /** + * submit task into rpc worker pool + * @return future result of task + */ + public ListenableFuture submit() { + ListenableFuture> compoundResult = getTaskContext().getRpcPool().submit(this); + return Futures.dereference(compoundResult); } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java new file mode 100644 index 0000000000..e96da2bc26 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import java.util.concurrent.TimeUnit; + +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; + +import com.google.common.util.concurrent.ListeningExecutorService; + +/** + * + */ +public class OFRpcTaskContext { + + private IMessageDispatchService messageService; + private SessionContext session; + private NotificationProviderService rpcNotificationProviderService; + private long maxTimeout; + private TimeUnit maxTimeoutUnit; + private ListeningExecutorService rpcPool; + + /** + * @return the messageService + */ + public IMessageDispatchService getMessageService() { + return messageService; + } + /** + * @param messageService the messageService to set + */ + public void setMessageService(IMessageDispatchService messageService) { + this.messageService = messageService; + } + /** + * @return the session + */ + public SessionContext getSession() { + return session; + } + /** + * @param session the session to set + */ + public void setSession(SessionContext session) { + this.session = session; + } + /** + * @return the rpcNotificationProviderService + */ + public NotificationProviderService getRpcNotificationProviderService() { + return rpcNotificationProviderService; + } + /** + * @param rpcNotificationProviderService the rpcNotificationProviderService to set + */ + public void setRpcNotificationProviderService( + NotificationProviderService rpcNotificationProviderService) { + this.rpcNotificationProviderService = rpcNotificationProviderService; + } + /** + * @return the maxTimeout + */ + public long getMaxTimeout() { + return maxTimeout; + } + /** + * @param maxTimeout the maxTimeout to set + */ + public void setMaxTimeout(long maxTimeout) { + this.maxTimeout = maxTimeout; + } + /** + * @return the maxTimeoutUnit + */ + public TimeUnit getMaxTimeoutUnit() { + return maxTimeoutUnit; + } + /** + * @param maxTimeoutUnit the maxTimeoutUnit to set + */ + public void setMaxTimeoutUnit(TimeUnit maxTimeoutUnit) { + this.maxTimeoutUnit = maxTimeoutUnit; + } + /** + * @param rpcPool + */ + public void setRpcPool(ListeningExecutorService rpcPool) { + this.rpcPool = rpcPool; + } + + /** + * @return the rpcPool + */ + public ListeningExecutorService getRpcPool() { + return rpcPool; + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java index 2bd8346d13..921d49421b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java @@ -8,101 +8,400 @@ package org.opendaylight.openflowplugin.openflow.md.core.sal; import java.math.BigInteger; +import java.util.Collection; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + /** * */ public abstract class OFRpcTaskFactory { /** - * @param maxTimeout - * @param maxTimeoutUnit - * @param helper + * @param taskContext + * @param input + * @param cookie * @return UpdateFlow task */ public static OFRpcTask> createAddFlowTask( - final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) { - OFRpcTask> task = - new OFRpcTask>() { - + OFRpcTaskContext taskContext, AddFlowInput input, + SwitchConnectionDistinguisher cookie) { + OFRpcTask> task = + new OFRpcTask>(taskContext, cookie, input) { + @Override - public void run() { - helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult()); - if (getResult().isDone()) { - return; - } - - // Convert the AddFlowInput to FlowModInput - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), - getVersion(), getSession().getFeatures().getDatapathId()); - Long xId = getSession().getNextXid(); - ofFlowModInput.setXid(xId); - - if (null != getRpcNotificationProviderService()) { - FlowAddedBuilder newFlow = new FlowAddedBuilder( - (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput()); - newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - newFlow.setFlowRef(getInput().getFlowRef()); - getRpcNotificationProviderService().publish(newFlow.build()); + public ListenableFuture> call() { + ListenableFuture> result = SettableFuture.create(); + + Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); + if (!barrierErrors.isEmpty()) { + OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); + } else { + // Convert the AddFlowInput to FlowModInput + FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofFlowModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().flowMod(ofFlowModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), + createFlowAddedNotification(xId, getInput())); } - Future> resultFromOFLib = - getMessageService().flowMod(ofFlowModInput.build(), getCookie()); - OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult()); + return result; } }; + return task; } /** - * @param maxTimeout - * @param maxTimeoutUnit - * @param helper + * @param xId + * @return + */ + protected static NotificationComposer createFlowAddedNotification( + final Long xId, final AddFlowInput input) { + return new NotificationComposer() { + @Override + public FlowAdded compose() { + FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input); + newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + newFlow.setFlowRef(input.getFlowRef()); + return newFlow.build(); + } + }; + } + + /** + * @param taskContext + * @param input + * @param cookie * @return UpdateFlow task */ public static OFRpcTask> createUpdateFlowTask( - final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) { - OFRpcTask> task = - new OFRpcTask>() { + OFRpcTaskContext taskContext, UpdateFlowInput input, + SwitchConnectionDistinguisher cookie) { + + OFRpcTask> task = + new OFRpcTask>(taskContext, cookie, input) { + + @Override + public ListenableFuture> call() { + ListenableFuture> result = null; + Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), + getInput().getUpdatedFlow().isBarrier(), getCookie()); + if (!barrierErrors.isEmpty()) { + OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); + } else { + // Convert the AddFlowInput to FlowModInput + FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), + getVersion(), getSession().getFeatures().getDatapathId()); + Long xId = getSession().getNextXid(); + ofFlowModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().flowMod(ofFlowModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), + createFlowUpdatedNotification(xId, getInput())); + } + return result; + } + }; + return task; + } + /** + * @param xId + * @param input + * @return + */ + protected static NotificationComposer createFlowUpdatedNotification( + final Long xId, final UpdateFlowInput input) { + return new NotificationComposer() { + @Override + public FlowUpdated compose() { + FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow()); + updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + updFlow.setFlowRef(input.getFlowRef()); + return updFlow.build(); + } + }; + } + + /** + * @param taskContext + * @param input + * @param cookie + * @return update group task + */ + public static OFRpcTask> createAddGroupTask( + final OFRpcTaskContext taskContext, AddGroupInput input, + final SwitchConnectionDistinguisher cookie) { + OFRpcTask> task = + new OFRpcTask>(taskContext, cookie, input) { + @Override - public void run() { - helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult()); - if (getResult().isDone()) { - return; + public ListenableFuture> call() { + ListenableFuture> result = SettableFuture.create(); + + Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); + if (!barrierErrors.isEmpty()) { + OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); + } else { + // Convert the AddGroupInput to GroupModInput + GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(), + getVersion(), getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofGroupModInput.setXid(xId); + + Future> resultFromOFLib = getMessageService() + .groupMod(ofGroupModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), + createGroupAddedNotification(xId, getInput())); } - // Convert the AddFlowInput to FlowModInput - FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(), - getVersion(), getSession().getFeatures().getDatapathId()); - Long xId = getSession().getNextXid(); - ofFlowModInput.setXid(xId); + return result; + } + }; + + return task; + } + - if (null != getRpcNotificationProviderService()) { - FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(getInput().getUpdatedFlow()); - updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); - updFlow.setFlowRef(getInput().getFlowRef()); - getRpcNotificationProviderService().publish(updFlow.build()); + /** + * @param xId + * @param input + * @return + */ + protected static NotificationComposer createGroupAddedNotification( + final Long xId, final AddGroupInput input) { + return new NotificationComposer() { + @Override + public GroupAdded compose() { + GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input); + groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + groupMod.setGroupRef(input.getGroupRef()); + return groupMod.build(); + } + }; + } + + /** + * @param taskContext + * @param input + * @param cookie + * @return update meter task + */ + public static OFRpcTask> createAddMeterTask( + OFRpcTaskContext taskContext, AddMeterInput input, + SwitchConnectionDistinguisher cookie) { + OFRpcTask> task = + new OFRpcTask>(taskContext, cookie, input) { + + @Override + public ListenableFuture> call() { + ListenableFuture> result = SettableFuture.create(); + + Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie()); + if (!barrierErrors.isEmpty()) { + OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); + } else { + // Convert the AddGroupInput to GroupModInput + MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion()); + final Long xId = getSession().getNextXid(); + ofMeterModInput.setXid(xId); + + Future> resultFromOFLib = getMessageService() + .meterMod(ofMeterModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), + createMeterAddedNotification(xId, getInput())); } - Future> resultFromOFLib = - getMessageService().flowMod(ofFlowModInput.build(), getCookie()); - OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult()); + return result; } }; + return task; + + } + + /** + * @param xId + * @param input + * @return + */ + protected static NotificationComposer createMeterAddedNotification( + final Long xId, final AddMeterInput input) { + return new NotificationComposer() { + @Override + public MeterAdded compose() { + MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input); + meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + meterMod.setMeterRef(input.getMeterRef()); + return meterMod.build(); + } + }; + } + + /** + * @param taskContext + * @param input + * @param cookie + * @return UpdateFlow task + */ + public static OFRpcTask> createUpdateGroupTask( + OFRpcTaskContext taskContext, UpdateGroupInput input, + SwitchConnectionDistinguisher cookie) { + OFRpcTask> task = + new OFRpcTask>(taskContext, cookie, input) { + + @Override + public ListenableFuture> call() { + ListenableFuture> result = null; + Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), + getInput().getUpdatedGroup().isBarrier(), getCookie()); + if (!barrierErrors.isEmpty()) { + OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); + } else { + // Convert the UpdateGroupInput to GroupModInput + GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput( + getInput().getUpdatedGroup(), getVersion(), + getSession().getFeatures().getDatapathId()); + final Long xId = getSession().getNextXid(); + ofGroupModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().groupMod(ofGroupModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), + createGroupUpdatedNotification(xId, getInput())); + } + return result; + } + }; + return task; + } + + /** + * @param xId + * @param input + * @return + */ + protected static NotificationComposer createGroupUpdatedNotification( + final Long xId, final UpdateGroupInput input) { + return new NotificationComposer() { + @Override + public GroupUpdated compose() { + GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup()); + groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + groupMod.setGroupRef(input.getGroupRef()); + return groupMod.build(); + } + }; } + /** + * @param taskContext + * @param input + * @param cookie + * @return update meter task + */ + public static OFRpcTask> createUpdateMeterTask( + OFRpcTaskContext taskContext, UpdateMeterInput input, + SwitchConnectionDistinguisher cookie) { + OFRpcTask> task = + new OFRpcTask>(taskContext, cookie, input) { + + @Override + public ListenableFuture> call() { + ListenableFuture> result = null; + Collection barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), + getInput().getUpdatedMeter().isBarrier(), getCookie()); + if (!barrierErrors.isEmpty()) { + OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture>) result), barrierErrors); + } else { + // Convert the UpdateMeterInput to MeterModInput + MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput( + getInput().getUpdatedMeter(), getVersion()); + final Long xId = getSession().getNextXid(); + ofMeterModInput.setXid(xId); + + Future> resultFromOFLib = + getMessageService().meterMod(ofMeterModInput.build(), getCookie()); + result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); + + OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), + createMeterUpdatedNotification(xId, getInput())); + } + return result; + } + }; + return task; + } + + /** + * @param xId + * @param input + * @return + */ + protected static NotificationComposer createMeterUpdatedNotification( + final Long xId, final UpdateMeterInput input) { + return new NotificationComposer() { + @Override + public MeterUpdated compose() { + MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter()); + meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue()))); + meterMod.setMeterRef(input.getMeterRef()); + return meterMod.build(); + } + }; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java deleted file mode 100644 index d844a65b69..0000000000 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskHelper.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.openflowplugin.openflow.md.core.sal; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; -import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; -import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; -import org.opendaylight.yangtools.yang.common.RpcResult; - -import com.google.common.base.Objects; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.JdkFutureAdapters; -import com.google.common.util.concurrent.SettableFuture; - -/** - * - */ -public class OFRpcTaskHelper { - - private IMessageDispatchService messageService; - private SessionContext session; - private NotificationProviderService rpcNotificationProviderService; - /** - * @param cookie - * @param messageService - * @param session - * @param rpcNotificationProviderService - */ - public OFRpcTaskHelper(IMessageDispatchService messageService, SessionContext session, - NotificationProviderService rpcNotificationProviderService) { - this.messageService = messageService; - this.session = session; - this.rpcNotificationProviderService = rpcNotificationProviderService; - } - - - /** - * @param task - * @param input - * @param cookie - * @return inited task - */ - public OFRpcTask initTask(OFRpcTask task, T input, SwitchConnectionDistinguisher cookie) { - task.setMessageService(messageService); - task.setSession(session); - task.setRpcNotificationProviderService(rpcNotificationProviderService); - task.setResult(SettableFuture.create()); - task.setCookie(cookie); - task.setInput(input); - return task; - } - - /** - * @param intern - * @param wrapper - */ - public static void chainFutures(final Future intern, final SettableFuture wrapper) { - Futures.addCallback( - JdkFutureAdapters.listenInPoolThread(intern), - new FutureCallback() { - - @Override - public void onSuccess( - K result) { - wrapper.set(result); - } - - @Override - public void onFailure(Throwable t) { - wrapper.setException(t); - } - - }); - } - - /** - * @param maxTimeout - * @param maxTimeoutUnit - * @param isBarrier - * @param cookie - * @param result - */ - public void rawBarrierSend(final long maxTimeout, final TimeUnit maxTimeoutUnit, - Boolean isBarrier, SwitchConnectionDistinguisher cookie, SettableFuture> result) { - if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) { - Future> barrierFuture = ModelDrivenSwitchImpl.sendBarrier(cookie, session, messageService); - try { - RpcResult barrierResult = barrierFuture.get(maxTimeout, maxTimeoutUnit); - if (!barrierResult.isSuccessful()) { - result.set(Rpcs.getRpcResult(false, barrierResult.getErrors())); - } - } catch (Exception e) { - result.setException(e); - } - } - } -} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java new file mode 100644 index 0000000000..03662fd36a --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core.sal; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.Future; + +import org.opendaylight.controller.sal.binding.api.NotificationProviderService; +import org.opendaylight.controller.sal.common.util.RpcErrors; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.openflowplugin.openflow.md.OFConstants; +import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory; +import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; +import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; +import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; +import org.opendaylight.yangtools.yang.binding.Notification; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; +import org.opendaylight.yangtools.yang.common.RpcResult; + +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * + */ +public abstract class OFRpcTaskUtil { + + /** + * @param taskContext + * @param isBarrier + * @param cookie + * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success + */ + public static Collection manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier, + SwitchConnectionDistinguisher cookie) { + Collection errors = null; + if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) { + Future> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService()); + try { + RpcResult barrierResult = barrierFuture.get( + taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit()); + if (!barrierResult.isSuccessful()) { + errors = barrierResult.getErrors(); + } + } catch (Exception e) { + RpcError rpcError = RpcErrors.getRpcError( + OFConstants.APPLICATION_TAG, OFConstants.ERROR_TAG_TIMEOUT, + "barrier sending failed", ErrorSeverity.WARNING, + "switch failed to respond on barrier request - message ordering is not preserved", ErrorType.RPC, e); + errors = Lists.newArrayList(rpcError); + } + } + + if (errors == null) { + errors = Collections.emptyList(); + } + + return errors; + } + + /** + * @param session + * @param cookie + * @param messageService + * @return barrier response + */ + private static Future> sendBarrier(SessionContext session, + SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) { + BarrierInput barrierInput = MessageFactory.createBarrier( + session.getFeatures().getVersion(), session.getNextXid()); + return messageService.barrier(barrierInput, cookie); + } + + /** + * @param result rpcResult with success = false, errors = given collection + * @param barrierErrors + */ + public static void wrapBarrierErrors(SettableFuture> result, + Collection barrierErrors) { + result.set(Rpcs.getRpcResult(false, barrierErrors)); + } + + /** + * @param originalResult + * @param notificationProviderService + * @param notificationComposer lazy notification composer + */ + public static void hookFutureNotification(ListenableFuture originalResult, + final NotificationProviderService notificationProviderService, + final NotificationComposer notificationComposer) { + Futures.addCallback(originalResult, new FutureCallback() { + @Override + public void onSuccess(R result) { + if (null != notificationProviderService) { + notificationProviderService.publish(notificationComposer.compose()); + } + } + + @Override + public void onFailure(Throwable t) { + //NOOP + } + }); + + } + +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java index 0f127e8338..610e7d340b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java @@ -8,8 +8,6 @@ package org.opendaylight.openflowplugin.openflow.md.core.session; import java.math.BigInteger; -import java.util.Collection; -import java.util.Collections; import java.util.concurrent.Future; import org.opendaylight.controller.sal.common.util.Rpcs; @@ -47,12 +45,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder; -import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; /** * message dispatch service to send the message to switch. @@ -119,24 +119,29 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { } @Override - public Future> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) { + public Future> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary flowMod"); Future> response = getConnectionAdapter(cookie).flowMod(input); - // Send the same Xid back to caller - MessageDrivenSwitch - UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder(); - BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ; - flowModOutput.setTransactionId(new TransactionId(bigIntXid)); - - UpdateFlowOutput result = flowModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - // solution 1: sending directly and hooking listener to get error - // hookup listener to catch the possible error with no reference to returned future-object - LOG.debug("Returning to ModelDrivenSwitch for flowMod RPC"); - return Futures.immediateFuture(rpcResult); - + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function,RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ; + flowModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateFlowOutput result = flowModOutput.build(); + RpcResult rpcResult = Rpcs.getRpcResult( + inputArg.isSuccessful(), result, inputArg.getErrors()); + return rpcResult; + } + }); + + return xidResult; } @Override @@ -161,45 +166,55 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { } @Override - public Future> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) { + public Future> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary groupMod"); Future> response = getConnectionAdapter(cookie).groupMod(input); - // Send the same Xid back to caller - MessageDrivenSwitch - UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder(); - BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); - groupModOutput.setTransactionId(new TransactionId(bigIntXid)); - - UpdateGroupOutput result = groupModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - // solution 1: sending directly and hooking listener to get error - // hookup listener to catch the possible error with no reference to returned future-object - LOG.debug("Returning to ModelDrivenSwitch for groupMod RPC"); - return Futures.immediateFuture(rpcResult); - + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function,RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + groupModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateGroupOutput result = groupModOutput.build(); + RpcResult rpcResult = Rpcs.getRpcResult( + inputArg.isSuccessful(), result, inputArg.getErrors()); + return rpcResult; + } + }); + + return xidResult; } @Override - public Future> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) { + public Future> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary meterMod"); Future> response = getConnectionAdapter(cookie).meterMod(input); - // Send the same Xid back to caller - MessageDrivenSwitch - UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder(); - BigInteger bigIntXid =BigInteger.valueOf(input.getXid()); - meterModOutput.setTransactionId(new TransactionId(bigIntXid)); + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function,RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + meterModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdateMeterOutput result = meterModOutput.build(); + RpcResult rpcResult = Rpcs.getRpcResult( + inputArg.isSuccessful(), result, inputArg.getErrors()); + return rpcResult; + } + }); - UpdateMeterOutput result = meterModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - // solution 1: sending directly and hooking listener to get error - // hookup listener to catch the possible error with no reference to returned future-object - LOG.debug("Returning to ModelDrivenSwitch for meterMod RPC"); - return Futures.immediateFuture(rpcResult); - + return xidResult; } @Override @@ -213,22 +228,29 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService { } @Override - public Future> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) { - + public Future> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) { LOG.debug("Calling OFLibrary portMod"); Future> response = getConnectionAdapter(cookie).portMod(input); - - // Send the same Xid back to caller - ModelDrivenSwitch - UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder(); - String stringXid =input.getXid().toString(); - BigInteger bigIntXid = new BigInteger( stringXid ); - portModOutput.setTransactionId(new TransactionId(bigIntXid)); - UpdatePortOutput result = portModOutput.build(); - Collection errors = Collections.emptyList(); - RpcResult rpcResult = Rpcs.getRpcResult(true, result, errors); - - LOG.debug("Returning to ModelDrivenSwitch for portMod RPC"); - return Futures.immediateFuture(rpcResult); + + // appending xid + ListenableFuture> xidResult = Futures.transform( + JdkFutureAdapters.listenInPoolThread(response), + new Function,RpcResult>() { + + @Override + public RpcResult apply(final RpcResult inputArg) { + UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder(); + BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); + portModOutput.setTransactionId(new TransactionId(bigIntXid)); + + UpdatePortOutput result = portModOutput.build(); + RpcResult rpcResult = Rpcs.getRpcResult( + inputArg.isSuccessful(), result, inputArg.getErrors()); + return rpcResult; + } + }); + + return xidResult; } @Override diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java index cb5b2190ae..764fef56d0 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java @@ -18,11 +18,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; - -import java.util.concurrent.TimeUnit; - import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch; import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java index a3204b3b76..6e16b6eb7d 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java @@ -11,7 +11,6 @@ package org.opendaylight.openflowplugin.openflow.md.core.session; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; @@ -25,6 +24,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; +import com.google.common.util.concurrent.ListeningExecutorService; + /** * @author mirehak */ @@ -116,10 +117,10 @@ public interface SessionManager extends AutoCloseable { /** * @param newFixedThreadPool */ - void setRpcPool(ExecutorService newFixedThreadPool); + void setRpcPool(ListeningExecutorService newFixedThreadPool); /** * @return the rpcPool instance */ - ExecutorService getRpcPool(); + ListeningExecutorService getRpcPool(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java index 0caa5de64c..afda07fc38 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.api.data.DataProviderService; @@ -30,6 +29,8 @@ import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.ListeningExecutorService; + /** * @author mirehak */ @@ -41,11 +42,12 @@ public class SessionManagerOFImpl implements SessionManager { private Map>>> translatorMapping; private Map, Collection>> popListenerMapping; - protected ListenerRegistry sessionListeners; private NotificationProviderService notificationProviderService; private DataProviderService dataProviderService; + private ListeningExecutorService rpcPool; + /** * @return singleton instance @@ -196,8 +198,7 @@ public class SessionManagerOFImpl implements SessionManager { } } }; - private ExecutorService rpcPool; - + @Override public Map>>> getTranslatorMapping() { @@ -252,12 +253,12 @@ public class SessionManagerOFImpl implements SessionManager { } @Override - public void setRpcPool(ExecutorService rpcPool) { + public void setRpcPool(ListeningExecutorService rpcPool) { this.rpcPool = rpcPool; } @Override - public ExecutorService getRpcPool() { + public ListeningExecutorService getRpcPool() { return rpcPool; } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java deleted file mode 100644 index bb2fae988c..0000000000 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/TransactionKey.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.opendaylight.openflowplugin.openflow.md.core.session; - -public class TransactionKey { - - private static final long serialVersionUID = 7805731164917659700L; - final private Long _xId; - - public TransactionKey(Long transactionId) { - this._xId = transactionId; - } - - public Long getXId() { - return _xId; - } - - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((_xId == null) ? 0 : _xId.hashCode()); - return result; - } - - @Override - public boolean equals(java.lang.Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - TransactionKey other = (TransactionKey) obj; - if (_xId == null) { - if (other._xId != null) { - return false; - } - } else if (!_xId.equals(other._xId)) { - return false; - } - return true; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("TransactionId [_xId="); - builder.append(_xId); - builder.append("]"); - return builder.toString(); - } - -} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/FlowCreatorUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/FlowCreatorUtil.java index 7eafce1e96..30b77fdf02 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/FlowCreatorUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/FlowCreatorUtil.java @@ -19,7 +19,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.oxm.rev130731.matc import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.aggregate._case.MultipartRequestAggregateBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.flow._case.MultipartRequestFlowBuilder; -public class FlowCreatorUtil { +public abstract class FlowCreatorUtil { public static void setWildcardedFlowMatch(short version,MultipartRequestFlowBuilder flowBuilder){ if(version == OFConstants.OFP_VERSION_1_0){ diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/InventoryDataServiceUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/InventoryDataServiceUtil.java index 7aadc6c12f..81a00302c0 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/InventoryDataServiceUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/InventoryDataServiceUtil.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.List; -public class InventoryDataServiceUtil { +public abstract class InventoryDataServiceUtil { private final static Logger LOG = LoggerFactory.getLogger(InventoryDataServiceUtil.class); public final static String OF_URI_PREFIX = "openflow:"; diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PortTranslatorUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PortTranslatorUtil.java index 5ba0ab58c7..0e237f4366 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PortTranslatorUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/util/PortTranslatorUtil.java @@ -24,7 +24,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortStateV10; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping; -public class PortTranslatorUtil { +public abstract class PortTranslatorUtil { public static org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortFeatures translatePortFeatures(PortFeatures apf) { org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortFeatures napf = null; if(apf != null){ diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImplTest.java index 41572000b6..f84f3fcf98 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImplTest.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImplTest.java @@ -12,7 +12,6 @@ import java.math.BigInteger; import java.util.Collections; import java.util.Set; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -21,7 +20,6 @@ import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; -import org.opendaylight.controller.sal.common.util.Futures; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.openflowplugin.openflow.md.OFConstants; import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; @@ -29,7 +27,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistingu import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; -import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder; @@ -43,8 +40,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; /** * simple NPE smoke test @@ -76,7 +73,7 @@ public class ModelDrivenSwitchImplTest { Mockito.when(context.getFeatures()).thenReturn(features); Mockito.when(features.getDatapathId()).thenReturn(BigInteger.valueOf(1)); - OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10)); + OFSessionUtil.getSessionManager().setRpcPool(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10))); mdSwitchOF10 = new ModelDrivenSwitchImpl(null, null, context); mdSwitchOF13 = new ModelDrivenSwitchImpl(null, null, context); diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SwitchFeaturesUtilTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SwitchFeaturesUtilTest.java index 57b202ad9b..764d7db58d 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SwitchFeaturesUtilTest.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SwitchFeaturesUtilTest.java @@ -7,9 +7,9 @@ */ package org.opendaylight.openflowplugin.openflow.md.core.sal; -import junit.framework.Assert; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities; -- 2.36.6