*/
package org.opendaylight.openflowplugin.openflow.md.core.sal;
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.openflowjava.protocol.api.util.BinContent;
import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutputBuilder;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Future;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
/**
* RPC implementation of MD-switch
private short version = 0;
private final SessionContext session;
NotificationProviderService rpcNotificationProviderService;
-
- protected ModelDrivenSwitchImpl(NodeId nodeId, InstanceIdentifier<Node> identifier, SessionContext context) {
+ private OFRpcTaskHelper rpcTaskHelper;
+
+ // TODO:read timeout from configSubsystem
+ protected long maxTimeout = 1000;
+ protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
+
+ protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext context) {
super(identifier, context);
this.nodeId = nodeId;
messageService = sessionContext.getMessageDispatchService();
version = context.getPrimaryConductor().getVersion();
this.session = context;
rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
+ rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService);
}
@Override
- public Future<RpcResult<AddFlowOutput>> addFlow(AddFlowInput input) {
+ public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
- Long xId = null;
- // For Flow provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the AddFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext()
- .getFeatures().getDatapathId());
- xId = session.getNextXid();
- ofFlowModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- FlowAddedBuilder newFlow = new FlowAddedBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) input);
- newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- newFlow.setFlowRef(input.getFlowRef());
- rpcNotificationProviderService.publish(newFlow.build());
- }
-
- session.getbulkTransactionCache().put(new TransactionKey(xId), input);
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = messageService.flowMod(ofFlowModInput.build(), cookie);
- RpcResult<UpdateFlowOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for AddFlow RPC" + ex.getMessage());
- }
-
- UpdateFlowOutput updateFlowOutput = rpcResultFromOFLib.getResult();
-
- AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder();
- addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId());
- AddFlowOutput result = addFlowOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<AddFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Add Flow RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
+ OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
+ rpcTaskHelper.initTask(task, input, cookie);
+ OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()),
+ OFRpcFutureResultTransformFactory.createForAddFlowOutput());
}
+
@Override
- public Future<RpcResult<AddGroupOutput>> addGroup(AddGroupInput input) {
+ public Future<RpcResult<AddGroupOutput>> addGroup(final AddGroupInput input) {
LOG.debug("Calling the GroupMod RPC method on MessageDispatchService");
Long xId = null;
- // For Flow provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
+
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
xId = session.getNextXid();
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
}
@Override
- public Future<RpcResult<AddMeterOutput>> addMeter(AddMeterInput input) {
+ public Future<RpcResult<AddMeterOutput>> addMeter(final AddMeterInput input) {
LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
Long xId = null;
// For Meter provisioning, the SwitchConnectionDistinguisher is set to
}
@Override
- public Future<RpcResult<RemoveFlowOutput>> removeFlow(RemoveFlowInput input) {
+ public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
LOG.debug("Calling the removeFlow RPC method on MessageDispatchService");
Long xId = null;
// For Flow provisioning, the SwitchConnectionDistinguisher is set to
}
@Override
- public Future<RpcResult<RemoveGroupOutput>> removeGroup(RemoveGroupInput input) {
+ public Future<RpcResult<RemoveGroupOutput>> removeGroup(final RemoveGroupInput input) {
LOG.debug("Calling the Remove Group RPC method on MessageDispatchService");
Long xId = null;
}
@Override
- public Future<RpcResult<RemoveMeterOutput>> removeMeter(RemoveMeterInput input) {
+ public Future<RpcResult<RemoveMeterOutput>> removeMeter(final RemoveMeterInput input) {
LOG.debug("Calling the Remove MeterMod RPC method on MessageDispatchService");
Long xId = null;
}
@Override
- public Future<RpcResult<Void>> transmitPacket(TransmitPacketInput input) {
+ public Future<RpcResult<Void>> transmitPacket(final TransmitPacketInput input) {
LOG.debug("TransmitPacket - {}", input);
// Convert TransmitPacket to PacketOutInput
PacketOutInput message = PacketOutConvertor.toPacketOutInput(input, version, sessionContext.getNextXid(),
return messageService.packetOut(message, cookie);
}
- private FlowModInputBuilder toFlowModInputBuilder(Flow source) {
+ private FlowModInputBuilder toFlowModInputBuilder(final Flow source) {
FlowModInputBuilder target = new FlowModInputBuilder();
target.setCookie(source.getCookie().getValue());
target.setIdleTimeout(source.getIdleTimeout());
return target;
}
- private Match toMatch(org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
+ private Match toMatch(final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
MatchBuilder target = new MatchBuilder();
target.setMatchEntries(toMatchEntries(match));
}
private List<MatchEntries> toMatchEntries(
- org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
+ final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match match) {
List<MatchEntries> entries = new ArrayList<>();
return null;
}
@Override
- public Future<RpcResult<UpdateFlowOutput>> updateFlow(UpdateFlowInput input) {
+ public Future<RpcResult<UpdateFlowOutput>> updateFlow(final UpdateFlowInput input) {
LOG.debug("Calling the updateFlow RPC method on MessageDispatchService");
- Long xId = null;
- // Call the RPC method on MessageDispatchService
-
- // For Flow provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.getUpdatedFlow().isBarrier(), Boolean.FALSE)) {
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- xId = session.getNextXid();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the UpdateFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input.getUpdatedFlow(), version, this
- .getSessionContext().getFeatures().getDatapathId());
- xId = session.getNextXid();
- ofFlowModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- FlowUpdatedBuilder updateFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
- updateFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- updateFlow.setFlowRef(input.getFlowRef());
- rpcNotificationProviderService.publish(updateFlow.build());
- }
-
- session.getbulkTransactionCache().put(new TransactionKey(xId), input);
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib = messageService.flowMod(ofFlowModInput.build(), cookie);
-
- RpcResult<UpdateFlowOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for UpdateFlow RPC" + ex.getMessage());
- }
-
- UpdateFlowOutput updateFlowOutputOFLib = rpcResultFromOFLib.getResult();
-
- UpdateFlowOutputBuilder updateFlowOutput = new UpdateFlowOutputBuilder();
- updateFlowOutput.setTransactionId(updateFlowOutputOFLib.getTransactionId());
- UpdateFlowOutput result = updateFlowOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Update Flow RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
+ OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
+ rpcTaskHelper.initTask(task, input, cookie);
+ OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+
+ return task.getResult();
}
@Override
- public Future<RpcResult<UpdateGroupOutput>> updateGroup(UpdateGroupInput input) {
+ public Future<RpcResult<UpdateGroupOutput>> updateGroup(final UpdateGroupInput input) {
LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService");
Long xId = null;
}
@Override
- public Future<RpcResult<UpdateMeterOutput>> updateMeter(UpdateMeterInput input) {
+ public Future<RpcResult<UpdateMeterOutput>> updateMeter(final UpdateMeterInput input) {
LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
Long xId = null;
* Methods for requesting statistics from switch
*/
@Override
- public Future<RpcResult<GetAllGroupStatisticsOutput>> getAllGroupStatistics(GetAllGroupStatisticsInput input) {
+ public Future<RpcResult<GetAllGroupStatisticsOutput>> getAllGroupStatistics(final GetAllGroupStatisticsInput input) {
GetAllGroupStatisticsOutputBuilder output = new GetAllGroupStatisticsOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
}
@Override
- public Future<RpcResult<GetGroupDescriptionOutput>> getGroupDescription(GetGroupDescriptionInput input) {
+ public Future<RpcResult<GetGroupDescriptionOutput>> getGroupDescription(final GetGroupDescriptionInput input) {
GetGroupDescriptionOutputBuilder output = new GetGroupDescriptionOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
}
@Override
- public Future<RpcResult<GetGroupFeaturesOutput>> getGroupFeatures(GetGroupFeaturesInput input) {
+ public Future<RpcResult<GetGroupFeaturesOutput>> getGroupFeatures(final GetGroupFeaturesInput input) {
GetGroupFeaturesOutputBuilder output = new GetGroupFeaturesOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
}
@Override
- public Future<RpcResult<GetGroupStatisticsOutput>> getGroupStatistics(GetGroupStatisticsInput input) {
+ public Future<RpcResult<GetGroupStatisticsOutput>> getGroupStatistics(final GetGroupStatisticsInput input) {
GetGroupStatisticsOutputBuilder output = new GetGroupStatisticsOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
@Override
public Future<RpcResult<GetAllMeterConfigStatisticsOutput>> getAllMeterConfigStatistics(
- GetAllMeterConfigStatisticsInput input) {
+ final GetAllMeterConfigStatisticsInput input) {
GetAllMeterConfigStatisticsOutputBuilder output = new GetAllMeterConfigStatisticsOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
}
@Override
- public Future<RpcResult<GetAllMeterStatisticsOutput>> getAllMeterStatistics(GetAllMeterStatisticsInput input) {
+ public Future<RpcResult<GetAllMeterStatisticsOutput>> getAllMeterStatistics(final GetAllMeterStatisticsInput input) {
GetAllMeterStatisticsOutputBuilder output = new GetAllMeterStatisticsOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
}
@Override
- public Future<RpcResult<GetMeterFeaturesOutput>> getMeterFeatures(GetMeterFeaturesInput input) {
+ public Future<RpcResult<GetMeterFeaturesOutput>> getMeterFeatures(final GetMeterFeaturesInput input) {
GetMeterFeaturesOutputBuilder output = new GetMeterFeaturesOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
}
@Override
- public Future<RpcResult<GetMeterStatisticsOutput>> getMeterStatistics(GetMeterStatisticsInput input) {
+ public Future<RpcResult<GetMeterStatisticsOutput>> getMeterStatistics(final GetMeterStatisticsInput input) {
GetMeterStatisticsOutputBuilder output = new GetMeterStatisticsOutputBuilder();
Collection<RpcError> errors = Collections.emptyList();
@Override
public Future<RpcResult<GetAllNodeConnectorsStatisticsOutput>> getAllNodeConnectorsStatistics(
- GetAllNodeConnectorsStatisticsInput arg0) {
+ final GetAllNodeConnectorsStatisticsInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetNodeConnectorStatisticsOutput>> getNodeConnectorStatistics(
- GetNodeConnectorStatisticsInput arg0) {
+ final GetNodeConnectorStatisticsInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
return Futures.immediateFuture(rpcResult);
}
- private TransactionId generateTransactionId(Long xid) {
+ private TransactionId generateTransactionId(final Long xid) {
String stringXid = xid.toString();
BigInteger bigIntXid = new BigInteger(stringXid);
return new TransactionId(bigIntXid);
}
@Override
- public Future<RpcResult<UpdatePortOutput>> updatePort(UpdatePortInput input) {
+ public Future<RpcResult<UpdatePortOutput>> updatePort(final UpdatePortInput input) {
PortModInput ofPortModInput = null;
RpcResult<UpdatePortOutput> rpcResultFromOFLib = null;
}
@Override
- public Future<RpcResult<UpdateTableOutput>> updateTable(UpdateTableInput input) {
+ public Future<RpcResult<UpdateTableOutput>> updateTable(final UpdateTableInput input) {
// Get the Xid. The same Xid has to be sent in all the Multipart
// requests
@Override
public Future<RpcResult<GetAllFlowStatisticsFromFlowTableOutput>> getAllFlowStatisticsFromFlowTable(
- GetAllFlowStatisticsFromFlowTableInput arg0) {
+ final GetAllFlowStatisticsFromFlowTableInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetAllFlowsStatisticsFromAllFlowTablesOutput>> getAllFlowsStatisticsFromAllFlowTables(
- GetAllFlowsStatisticsFromAllFlowTablesInput arg0) {
+ final GetAllFlowsStatisticsFromAllFlowTablesInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetFlowStatisticsFromFlowTableOutput>> getFlowStatisticsFromFlowTable(
- GetFlowStatisticsFromFlowTableInput arg0) {
+ final GetFlowStatisticsFromFlowTableInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForAllFlowsOutput>> getAggregateFlowStatisticsFromFlowTableForAllFlows(
- GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput arg0) {
+ final GetAggregateFlowStatisticsFromFlowTableForAllFlowsInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetAggregateFlowStatisticsFromFlowTableForGivenMatchOutput>> getAggregateFlowStatisticsFromFlowTableForGivenMatch(
- GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput arg0) {
+ final GetAggregateFlowStatisticsFromFlowTableForGivenMatchInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
}
@Override
- public Future<RpcResult<GetFlowTablesStatisticsOutput>> getFlowTablesStatistics(GetFlowTablesStatisticsInput arg0) {
+ public Future<RpcResult<GetFlowTablesStatisticsOutput>> getFlowTablesStatistics(final GetFlowTablesStatisticsInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetAllQueuesStatisticsFromAllPortsOutput>> getAllQueuesStatisticsFromAllPorts(
- GetAllQueuesStatisticsFromAllPortsInput arg0) {
+ final GetAllQueuesStatisticsFromAllPortsInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetAllQueuesStatisticsFromGivenPortOutput>> getAllQueuesStatisticsFromGivenPort(
- GetAllQueuesStatisticsFromGivenPortInput arg0) {
+ final GetAllQueuesStatisticsFromGivenPortInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
@Override
public Future<RpcResult<GetQueueStatisticsFromGivenPortOutput>> getQueueStatisticsFromGivenPort(
- GetQueueStatisticsFromGivenPortInput arg0) {
+ final GetQueueStatisticsFromGivenPortInput arg0) {
// Generate xid to associate it with the request
Long xid = this.getSessionContext().getNextXid();
RpcResult<GetQueueStatisticsFromGivenPortOutput> rpcResult = Rpcs.getRpcResult(true, output.build(), errors);
return Futures.immediateFuture(rpcResult);
}
+
+ /**
+ * @param input
+ * @param cookie
+ * @param session
+ * @param messageService
+ * @return barrier result
+ */
+ public static Future<RpcResult<BarrierOutput>> sendBarrier(
+ SwitchConnectionDistinguisher cookie, SessionContext session,
+ IMessageDispatchService messageService) {
+ BarrierInputBuilder barrierInput = new BarrierInputBuilder();
+ barrierInput.setVersion(session.getFeatures().getVersion());
+ barrierInput.setXid(session.getNextXid());
+ return messageService.barrier(barrierInput.build(), cookie);
+ }
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.math.BigInteger;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ *
+ */
+public abstract class OFRpcTaskFactory {
+
+ /**
+ * @param maxTimeout
+ * @param maxTimeoutUnit
+ * @param helper
+ * @return UpdateFlow task
+ */
+ public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
+ final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
+ OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
+ new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>() {
+
+ @Override
+ public void run() {
+ helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult());
+ if (getResult().isDone()) {
+ return;
+ }
+
+ // Convert the AddFlowInput to FlowModInput
+ FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
+ getVersion(), getSession().getFeatures().getDatapathId());
+ Long xId = getSession().getNextXid();
+ ofFlowModInput.setXid(xId);
+
+ if (null != getRpcNotificationProviderService()) {
+ FlowAddedBuilder newFlow = new FlowAddedBuilder(
+ (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
+ newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ newFlow.setFlowRef(getInput().getFlowRef());
+ getRpcNotificationProviderService().publish(newFlow.build());
+ }
+
+ getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput());
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+ OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+ }
+ };
+ return task;
+ }
+
+ /**
+ * @param maxTimeout
+ * @param maxTimeoutUnit
+ * @param helper
+ * @return UpdateFlow task
+ */
+ public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
+ final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
+ OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
+ new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>() {
+
+ @Override
+ public void run() {
+ helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult());
+ if (getResult().isDone()) {
+ return;
+ }
+
+ // Convert the AddFlowInput to FlowModInput
+ FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(),
+ getVersion(), getSession().getFeatures().getDatapathId());
+ Long xId = getSession().getNextXid();
+ ofFlowModInput.setXid(xId);
+
+ if (null != getRpcNotificationProviderService()) {
+ FlowAddedBuilder newFlow = new FlowAddedBuilder(
+ (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
+ newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ newFlow.setFlowRef(getInput().getFlowRef());
+ getRpcNotificationProviderService().publish(newFlow.build());
+ }
+
+ getSession().getbulkTransactionCache().put(new TransactionKey(xId), getInput());
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+ OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+ }
+ };
+ return task;
+ }
+
+}