import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* RPC implementation of MD-switch
private final NodeId nodeId;
private final IMessageDispatchService messageService;
private short version = 0;
- private final SessionContext session;
- NotificationProviderService rpcNotificationProviderService;
- private OFRpcTaskHelper rpcTaskHelper;
+ private NotificationProviderService rpcNotificationProviderService;
+ private OFRpcTaskContext rpcTaskContext;
// TODO:read timeout from configSubsystem
protected long maxTimeout = 1000;
protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
- protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext context) {
- super(identifier, context);
+ protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier,
+ final SessionContext sessionContext) {
+ super(identifier, sessionContext);
this.nodeId = nodeId;
messageService = sessionContext.getMessageDispatchService();
- version = context.getPrimaryConductor().getVersion();
- this.session = context;
+ version = sessionContext.getPrimaryConductor().getVersion();
rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
- rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService);
+
+ rpcTaskContext = new OFRpcTaskContext();
+ rpcTaskContext.setSession(sessionContext);
+ rpcTaskContext.setMessageService(messageService);
+ rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
+ rpcTaskContext.setMaxTimeout(maxTimeout);
+ rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
+ rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool());
}
@Override
SwitchConnectionDistinguisher cookie = null;
OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
- OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
- rpcTaskHelper.initTask(task, input, cookie);
- OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+ OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
- return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()),
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(result),
OFRpcFutureResultTransformFactory.createForAddFlowOutput());
}
@Override
public Future<RpcResult<AddGroupOutput>> addGroup(final AddGroupInput input) {
LOG.debug("Calling the GroupMod RPC method on MessageDispatchService");
- Long xId = null;
-
+
// use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the AddGroupInput to GroupModInput
- GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
- .getFeatures().getDatapathId());
- xId = session.getNextXid();
- ofGroupModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- GroupAddedBuilder groupMod = new GroupAddedBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group) input);
- groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- groupMod.setGroupRef(input.getGroupRef());
- rpcNotificationProviderService.publish(groupMod.build());
- }
-
- Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
- RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for AddGroup RPC" + ex.getMessage());
- }
-
- UpdateGroupOutput updateGroupOutput = rpcResultFromOFLib.getResult();
-
- AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder();
- addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId());
- AddGroupOutput result = addGroupOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<AddGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Add Group RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+ OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task =
+ OFRpcTaskFactory.createAddGroupTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(result),
+ OFRpcFutureResultTransformFactory.createForAddGroupOutput());
}
@Override
public Future<RpcResult<AddMeterOutput>> addMeter(final AddMeterInput input) {
LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
- Long xId = null;
- // For Meter provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the AddMeterInput to MeterModInput
- MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
- xId = session.getNextXid();
- ofMeterModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- MeterAddedBuilder meterMod = new MeterAddedBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter) input);
- meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- meterMod.setMeterRef(input.getMeterRef());
- rpcNotificationProviderService.publish(meterMod.build());
- }
-
- Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
- RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for AddMeter RPC" + ex.getMessage());
- }
-
- UpdateMeterOutput updateMeterOutput = rpcResultFromOFLib.getResult();
-
- AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder();
- addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId());
- AddMeterOutput result = addMeterOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<AddMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Add Meter RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task =
+ OFRpcTaskFactory.createAddMeterTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(result),
+ OFRpcFutureResultTransformFactory.createForAddMeterOutput());
}
@Override
SwitchConnectionDistinguisher cookie = null;
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
barrierInput.setXid(xId);
barrierInput.setVersion(version);
@SuppressWarnings("unused")
}
// Convert the RemoveFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext()
+ FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, sessionContext
.getFeatures().getDatapathId());
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
ofFlowModInput.setXid(xId);
if (null != rpcNotificationProviderService) {
SwitchConnectionDistinguisher cookie = null;
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
barrierInput.setVersion(version);
barrierInput.setXid(xId);
// Convert the RemoveGroupInput to GroupModInput
GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
.getFeatures().getDatapathId());
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
ofGroupModInput.setXid(xId);
if (null != rpcNotificationProviderService) {
// the request can be routed through any connection to the switch
SwitchConnectionDistinguisher cookie = null;
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
barrierInput.setVersion(version);
barrierInput.setXid(xId);
// Convert the RemoveMeterInput to MeterModInput
MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
ofMeterModInput.setXid(xId);
if (null != rpcNotificationProviderService) {
SwitchConnectionDistinguisher cookie = null;
OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
- OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
- rpcTaskHelper.initTask(task, input, cookie);
- OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+ OFRpcTaskFactory.createUpdateFlowTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
- return task.getResult();
+ return result;
}
@Override
public Future<RpcResult<UpdateGroupOutput>> updateGroup(final UpdateGroupInput input) {
LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService");
- Long xId = null;
-
- // For Flow provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.getUpdatedGroup().isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the UpdateGroupInput to GroupModInput
- GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input.getUpdatedGroup(), version, this
- .getSessionContext().getFeatures().getDatapathId());
- xId = session.getNextXid();
- ofGroupModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
- groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- groupMod.setGroupRef(input.getGroupRef());
- rpcNotificationProviderService.publish(groupMod.build());
- }
-
- Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
-
- RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for updateGroup RPC" + ex.getMessage());
- }
-
- UpdateGroupOutput updateGroupOutputOFLib = rpcResultFromOFLib.getResult();
-
- UpdateGroupOutputBuilder updateGroupOutput = new UpdateGroupOutputBuilder();
- updateGroupOutput.setTransactionId(updateGroupOutputOFLib.getTransactionId());
- UpdateGroupOutput result = updateGroupOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Update Group RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task =
+ OFRpcTaskFactory.createUpdateGroupTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+
+ return result;
}
@Override
public Future<RpcResult<UpdateMeterOutput>> updateMeter(final UpdateMeterInput input) {
LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
- Long xId = null;
-
- // For Meter provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.getUpdatedMeter().isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the UpdateMeterInput to MeterModInput
- MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input.getUpdatedMeter(), version);
- xId = session.getNextXid();
- ofMeterModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
- meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- meterMod.setMeterRef(input.getMeterRef());
- rpcNotificationProviderService.publish(meterMod.build());
- }
-
- Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
- RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for UpdateMeter RPC" + ex.getMessage());
- }
-
- UpdateMeterOutput updateMeterOutputFromOFLib = rpcResultFromOFLib.getResult();
-
- UpdateMeterOutputBuilder updateMeterOutput = new UpdateMeterOutputBuilder();
- updateMeterOutput.setTransactionId(updateMeterOutputFromOFLib.getTransactionId());
- UpdateMeterOutput result = updateMeterOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Update Meter RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task =
+ OFRpcTaskFactory.createUpdateMeterTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+
+ return result;
}
@Override
RpcResult<GetQueueStatisticsFromGivenPortOutput> rpcResult = Rpcs.getRpcResult(true, output.build(), errors);
return Futures.immediateFuture(rpcResult);
}
-
- /**
- * @param input
- * @param cookie
- * @param session
- * @param messageService
- * @return barrier result
- */
- public static Future<RpcResult<BarrierOutput>> sendBarrier(
- SwitchConnectionDistinguisher cookie, SessionContext session,
- IMessageDispatchService messageService) {
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(session.getFeatures().getVersion());
- barrierInput.setXid(session.getNextXid());
- return messageService.barrier(barrierInput.build(), cookie);
- }
-
}