public static final int MAC_ADDRESS_LENGTH = 6;
public static final int SIZE_OF_LONG_IN_BYTES = 8;
public static final int SIGNUM_UNSIGNED = 1;
+
+ /** RpcError application tag */
+ public static final String APPLICATION_TAG = "OPENFLOW_PLUGIN";
+ /** RpcError tag - timeout */
+ public static final String ERROR_TAG_TIMEOUT = "TIMOUT";
}
package org.opendaylight.openflowplugin.openflow.md.core;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
/**
- * @author mirehak
*
*/
public class MDController implements IMDController, AutoCloseable {
final private int OF13 = OFConstants.OFP_VERSION_1_3;
private ErrorHandlerSimpleImpl errorHandler;
- private ExecutorService rpcPool;
-
/**
* @return translator mapping
// prepare worker pool for rpc
// TODO: get size from configSubsystem
- OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10));
+ OFSessionUtil.getSessionManager().setRpcPool(
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)));
}
import java.util.List;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
}
return result;
}
+
+ /**
+ * @param ofVersion
+ * @param ofXid
+ * @return barrier message
+ */
+ public static BarrierInput createBarrier(short ofVersion, long ofXid) {
+ BarrierInputBuilder barrierInput = new BarrierInputBuilder();
+ barrierInput.setVersion(ofVersion);
+ barrierInput.setXid(ofXid);
+ return barrierInput.build();
+ }
+
+// /**
+// * @param input
+// * @param cookie
+// * @param session
+// * @param messageService
+// * @return barrier result
+// */
+// public static Future<RpcResult<BarrierOutput>> sendBarrier(
+// SwitchConnectionDistinguisher cookie, SessionContext session,
+// IMessageDispatchService messageService) {
+// BarrierInputBuilder barrierInput = new BarrierInputBuilder();
+// barrierInput.setVersion(session.getFeatures().getVersion());
+// barrierInput.setXid(session.getNextXid());
+// return messageService.barrier(barrierInput.build(), cookie);
+// }
}
import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
import org.opendaylight.openflowplugin.openflow.md.util.FlowCreatorUtil;
import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* RPC implementation of MD-switch
private final NodeId nodeId;
private final IMessageDispatchService messageService;
private short version = 0;
- private final SessionContext session;
- NotificationProviderService rpcNotificationProviderService;
- private OFRpcTaskHelper rpcTaskHelper;
+ private NotificationProviderService rpcNotificationProviderService;
+ private OFRpcTaskContext rpcTaskContext;
// TODO:read timeout from configSubsystem
protected long maxTimeout = 1000;
protected TimeUnit maxTimeoutUnit = TimeUnit.MILLISECONDS;
- protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext context) {
- super(identifier, context);
+ protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier,
+ final SessionContext sessionContext) {
+ super(identifier, sessionContext);
this.nodeId = nodeId;
messageService = sessionContext.getMessageDispatchService();
- version = context.getPrimaryConductor().getVersion();
- this.session = context;
+ version = sessionContext.getPrimaryConductor().getVersion();
rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
- rpcTaskHelper = new OFRpcTaskHelper(messageService, context, rpcNotificationProviderService);
+
+ rpcTaskContext = new OFRpcTaskContext();
+ rpcTaskContext.setSession(sessionContext);
+ rpcTaskContext.setMessageService(messageService);
+ rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService);
+ rpcTaskContext.setMaxTimeout(maxTimeout);
+ rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
+ rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool());
}
@Override
SwitchConnectionDistinguisher cookie = null;
OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
- OFRpcTaskFactory.createAddFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
- rpcTaskHelper.initTask(task, input, cookie);
- OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+ OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
- return Futures.transform(JdkFutureAdapters.listenInPoolThread(task.getResult()),
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(result),
OFRpcFutureResultTransformFactory.createForAddFlowOutput());
}
@Override
public Future<RpcResult<AddGroupOutput>> addGroup(final AddGroupInput input) {
LOG.debug("Calling the GroupMod RPC method on MessageDispatchService");
- Long xId = null;
-
+
// use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the AddGroupInput to GroupModInput
- GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
- .getFeatures().getDatapathId());
- xId = session.getNextXid();
- ofGroupModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- GroupAddedBuilder groupMod = new GroupAddedBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group) input);
- groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- groupMod.setGroupRef(input.getGroupRef());
- rpcNotificationProviderService.publish(groupMod.build());
- }
-
- Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
- RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for AddGroup RPC" + ex.getMessage());
- }
-
- UpdateGroupOutput updateGroupOutput = rpcResultFromOFLib.getResult();
-
- AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder();
- addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId());
- AddGroupOutput result = addGroupOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<AddGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Add Group RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+ OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task =
+ OFRpcTaskFactory.createAddGroupTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(result),
+ OFRpcFutureResultTransformFactory.createForAddGroupOutput());
}
@Override
public Future<RpcResult<AddMeterOutput>> addMeter(final AddMeterInput input) {
LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
- Long xId = null;
- // For Meter provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the AddMeterInput to MeterModInput
- MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
- xId = session.getNextXid();
- ofMeterModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- MeterAddedBuilder meterMod = new MeterAddedBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter) input);
- meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- meterMod.setMeterRef(input.getMeterRef());
- rpcNotificationProviderService.publish(meterMod.build());
- }
-
- Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
- RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for AddMeter RPC" + ex.getMessage());
- }
-
- UpdateMeterOutput updateMeterOutput = rpcResultFromOFLib.getResult();
-
- AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder();
- addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId());
- AddMeterOutput result = addMeterOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<AddMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Add Meter RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task =
+ OFRpcTaskFactory.createAddMeterTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+
+ return Futures.transform(JdkFutureAdapters.listenInPoolThread(result),
+ OFRpcFutureResultTransformFactory.createForAddMeterOutput());
}
@Override
SwitchConnectionDistinguisher cookie = null;
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
barrierInput.setXid(xId);
barrierInput.setVersion(version);
@SuppressWarnings("unused")
}
// Convert the RemoveFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, this.getSessionContext()
+ FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version, sessionContext
.getFeatures().getDatapathId());
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
ofFlowModInput.setXid(xId);
if (null != rpcNotificationProviderService) {
SwitchConnectionDistinguisher cookie = null;
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
barrierInput.setVersion(version);
barrierInput.setXid(xId);
// Convert the RemoveGroupInput to GroupModInput
GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input, version, this.getSessionContext()
.getFeatures().getDatapathId());
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
ofGroupModInput.setXid(xId);
if (null != rpcNotificationProviderService) {
// the request can be routed through any connection to the switch
SwitchConnectionDistinguisher cookie = null;
if (Objects.firstNonNull(input.isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
BarrierInputBuilder barrierInput = new BarrierInputBuilder();
barrierInput.setVersion(version);
barrierInput.setXid(xId);
// Convert the RemoveMeterInput to MeterModInput
MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input, version);
- xId = session.getNextXid();
+ xId = sessionContext.getNextXid();
ofMeterModInput.setXid(xId);
if (null != rpcNotificationProviderService) {
SwitchConnectionDistinguisher cookie = null;
OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
- OFRpcTaskFactory.createUpdateFlowTask(maxTimeout, maxTimeoutUnit, rpcTaskHelper);
- rpcTaskHelper.initTask(task, input, cookie);
- OFSessionUtil.getSessionManager().getRpcPool().submit(task);
+ OFRpcTaskFactory.createUpdateFlowTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit();
- return task.getResult();
+ return result;
}
@Override
public Future<RpcResult<UpdateGroupOutput>> updateGroup(final UpdateGroupInput input) {
LOG.debug("Calling the update Group Mod RPC method on MessageDispatchService");
- Long xId = null;
-
- // For Flow provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
-
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.getUpdatedGroup().isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the UpdateGroupInput to GroupModInput
- GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(input.getUpdatedGroup(), version, this
- .getSessionContext().getFeatures().getDatapathId());
- xId = session.getNextXid();
- ofGroupModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
- groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- groupMod.setGroupRef(input.getGroupRef());
- rpcNotificationProviderService.publish(groupMod.build());
- }
-
- Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = messageService.groupMod(ofGroupModInput.build(), cookie);
-
- RpcResult<UpdateGroupOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for updateGroup RPC" + ex.getMessage());
- }
-
- UpdateGroupOutput updateGroupOutputOFLib = rpcResultFromOFLib.getResult();
-
- UpdateGroupOutputBuilder updateGroupOutput = new UpdateGroupOutputBuilder();
- updateGroupOutput.setTransactionId(updateGroupOutputOFLib.getTransactionId());
- UpdateGroupOutput result = updateGroupOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Update Group RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task =
+ OFRpcTaskFactory.createUpdateGroupTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateGroupOutput>> result = task.submit();
+
+ return result;
}
@Override
public Future<RpcResult<UpdateMeterOutput>> updateMeter(final UpdateMeterInput input) {
LOG.debug("Calling the MeterMod RPC method on MessageDispatchService");
- Long xId = null;
-
- // For Meter provisioning, the SwitchConnectionDistinguisher is set to
- // null so
- // the request can be routed through any connection to the switch
+
+ // use primary connection
SwitchConnectionDistinguisher cookie = null;
- if (Objects.firstNonNull(input.getUpdatedMeter().isBarrier(), Boolean.FALSE)) {
- xId = session.getNextXid();
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(version);
- barrierInput.setXid(xId);
- @SuppressWarnings("unused")
- Future<RpcResult<BarrierOutput>> barrierOFLib = messageService.barrier(barrierInput.build(), cookie);
- }
-
- // Convert the UpdateMeterInput to MeterModInput
- MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(input.getUpdatedMeter(), version);
- xId = session.getNextXid();
- ofMeterModInput.setXid(xId);
-
- if (null != rpcNotificationProviderService) {
- MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
- meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- meterMod.setMeterRef(input.getMeterRef());
- rpcNotificationProviderService.publish(meterMod.build());
- }
-
- Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = messageService.meterMod(ofMeterModInput.build(), cookie);
-
- RpcResult<UpdateMeterOutput> rpcResultFromOFLib = null;
-
- try {
- rpcResultFromOFLib = resultFromOFLib.get();
- } catch (Exception ex) {
- LOG.error(" Error while getting result for UpdateMeter RPC" + ex.getMessage());
- }
-
- UpdateMeterOutput updateMeterOutputFromOFLib = rpcResultFromOFLib.getResult();
-
- UpdateMeterOutputBuilder updateMeterOutput = new UpdateMeterOutputBuilder();
- updateMeterOutput.setTransactionId(updateMeterOutputFromOFLib.getTransactionId());
- UpdateMeterOutput result = updateMeterOutput.build();
-
- Collection<RpcError> errors = rpcResultFromOFLib.getErrors();
- RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning the Update Meter RPC result to MD-SAL");
- return Futures.immediateFuture(rpcResult);
+
+ OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task =
+ OFRpcTaskFactory.createUpdateMeterTask(rpcTaskContext, input, cookie);
+ ListenableFuture<RpcResult<UpdateMeterOutput>> result = task.submit();
+
+ return result;
}
@Override
RpcResult<GetQueueStatisticsFromGivenPortOutput> rpcResult = Rpcs.getRpcResult(true, output.build(), errors);
return Futures.immediateFuture(rpcResult);
}
-
- /**
- * @param input
- * @param cookie
- * @param session
- * @param messageService
- * @return barrier result
- */
- public static Future<RpcResult<BarrierOutput>> sendBarrier(
- SwitchConnectionDistinguisher cookie, SessionContext session,
- IMessageDispatchService messageService) {
- BarrierInputBuilder barrierInput = new BarrierInputBuilder();
- barrierInput.setVersion(session.getFeatures().getVersion());
- barrierInput.setXid(session.getNextXid());
- return messageService.barrier(barrierInput.build(), cookie);
- }
-
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+/**
+ * @param <N> type of notification
+ *
+ */
+public interface NotificationComposer<N extends Notification> {
+
+ /**
+ * @return notification instance
+ */
+ N compose();
+}
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
protected static Logger LOG = LoggerFactory
.getLogger(OFRpcFutureResultTransformFactory.class);
+ /**
+ * @param input
+ * @param result
+ * @return
+ */
+ protected static <E> RpcResult<E> assembleRpcResult(RpcResult<?> input, E result) {
+ Collection<RpcError> errors = input.getErrors();
+ RpcResult<E> rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors);
+ return rpcResult;
+ }
+
/**
* @return translator from {@link UpdateFlowOutput} to {@link AddFlowOutput}
*/
return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>() {
@Override
- public RpcResult<AddFlowOutput> apply(final RpcResult<UpdateFlowOutput> input) {
+ public RpcResult<AddFlowOutput> apply(RpcResult<UpdateFlowOutput> input) {
UpdateFlowOutput updateFlowOutput = input.getResult();
};
}
+ /**
+ * @return translator from {@link UpdateGroupOutput} to {@link AddGroupOutput}
+ */
+ public static Function<RpcResult<UpdateGroupOutput>, RpcResult<AddGroupOutput>> createForAddGroupOutput() {
+ return new Function<RpcResult<UpdateGroupOutput>,RpcResult<AddGroupOutput>>() {
+
+ @Override
+ public RpcResult<AddGroupOutput> apply(RpcResult<UpdateGroupOutput> input) {
+ UpdateGroupOutput updateGroupOutput = input.getResult();
+
+ AddGroupOutputBuilder addGroupOutput = new AddGroupOutputBuilder();
+ addGroupOutput.setTransactionId(updateGroupOutput.getTransactionId());
+ AddGroupOutput result = addGroupOutput.build();
+
+ RpcResult<AddGroupOutput> rpcResult = assembleRpcResult(input, result);
+ LOG.debug("Returning the Add Group RPC result to MD-SAL");
+ return rpcResult;
+ }
+ };
+ }
/**
- * @param input
- * @param result
- * @return
+ * @return translator from {@link UpdateGroupOutput} to {@link AddGroupOutput}
*/
- protected static <E> RpcResult<E> assembleRpcResult(RpcResult<?> input, E result) {
- Collection<RpcError> errors = input.getErrors();
- RpcResult<E> rpcResult = Rpcs.getRpcResult(input.isSuccessful(), result, errors);
- return rpcResult;
+ public static Function<RpcResult<UpdateMeterOutput>, RpcResult<AddMeterOutput>> createForAddMeterOutput() {
+ return new Function<RpcResult<UpdateMeterOutput>,RpcResult<AddMeterOutput>>() {
+
+ @Override
+ public RpcResult<AddMeterOutput> apply(final RpcResult<UpdateMeterOutput> input) {
+ UpdateMeterOutput updateMeterOutput = input.getResult();
+
+ AddMeterOutputBuilder addMeterOutput = new AddMeterOutputBuilder();
+ addMeterOutput.setTransactionId(updateMeterOutput.getTransactionId());
+ AddMeterOutput result = addMeterOutput.build();
+
+ RpcResult<AddMeterOutput> rpcResult = assembleRpcResult(input, result);
+ LOG.debug("Returning the Add Meter RPC result to MD-SAL");
+ return rpcResult;
+ }
+ };
}
+
+
+
+
}
*/
package org.opendaylight.openflowplugin.openflow.md.core.sal;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* @param <T> input type
* @param <K> future output type
*/
-public abstract class OFRpcTask<T, K> implements Runnable {
+public abstract class OFRpcTask<T, K> implements Callable<ListenableFuture<K>> {
- private SwitchConnectionDistinguisher cookie;
- private IMessageDispatchService messageService;
- private SessionContext session;
+ private OFRpcTaskContext taskContext;
private T input;
- private SettableFuture<K> result;
- private NotificationProviderService rpcNotificationProviderService;
-
- /**
- * @return the result
- */
- public SettableFuture<K> getResult() {
- return result;
- }
+ private SwitchConnectionDistinguisher cookie;
/**
- * @param result the result to set
+ * @param taskContext
+ * @param input
+ * @param cookie
*/
- public void setResult(SettableFuture<K> result) {
- this.result = result;
+ public OFRpcTask(OFRpcTaskContext taskContext, SwitchConnectionDistinguisher cookie, T input) {
+ this.taskContext = taskContext;
+ this.cookie = cookie;
+ this.input = input;
}
/**
}
/**
- * @return the messageService
+ * @param cookie the cookie to set
*/
- public IMessageDispatchService getMessageService() {
- return messageService;
+ public void setCookie(SwitchConnectionDistinguisher cookie) {
+ this.cookie = cookie;
}
/**
- * @return the session
+ * @return the input
*/
- public SessionContext getSession() {
- return session;
+ public T getInput() {
+ return input;
}
-
+
/**
- * @return protocol version
+ * @param input the input to set
*/
- public Short getVersion() {
- return session.getFeatures().getVersion();
+ public void setInput(T input) {
+ this.input = input;
}
/**
- * @param cookie the cookie to set
+ * @return the rpcNotificationProviderService
*/
- public void setCookie(SwitchConnectionDistinguisher cookie) {
- this.cookie = cookie;
+ public NotificationProviderService getRpcNotificationProviderService() {
+ return taskContext.getRpcNotificationProviderService();
}
/**
- * @param messageService the messageService to set
+ * @return message service
+ * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMessageService()
*/
- public void setMessageService(IMessageDispatchService messageService) {
- this.messageService = messageService;
+ public IMessageDispatchService getMessageService() {
+ return taskContext.getMessageService();
}
/**
- * @param session the session to set
+ * @return session
+ * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getSession()
*/
- public void setSession(SessionContext session) {
- this.session = session;
+ public SessionContext getSession() {
+ return taskContext.getSession();
}
/**
- * @return the input
+ * @return max timeout
+ * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMaxTimeout()
*/
- public T getInput() {
- return input;
+ public long getMaxTimeout() {
+ return taskContext.getMaxTimeout();
}
/**
- * @param input the input to set
+ * @return time unit for max timeout
+ * @see org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskContext#getMaxTimeoutUnit()
*/
- public void setInput(T input) {
- this.input = input;
+ public TimeUnit getMaxTimeoutUnit() {
+ return taskContext.getMaxTimeoutUnit();
}
-
+
/**
- * @param rpcNotificationProviderService
+ * @return protocol version
*/
- public void setRpcNotificationProviderService(
- NotificationProviderService rpcNotificationProviderService) {
- this.rpcNotificationProviderService = rpcNotificationProviderService;
+ public Short getVersion() {
+ return taskContext.getSession().getFeatures().getVersion();
+
}
/**
- * @return the rpcNotificationProviderService
+ * @return the taskContext
*/
- public NotificationProviderService getRpcNotificationProviderService() {
- return rpcNotificationProviderService;
+ public OFRpcTaskContext getTaskContext() {
+ return taskContext;
+ }
+
+ /**
+ * submit task into rpc worker pool
+ * @return future result of task
+ */
+ public ListenableFuture<K> submit() {
+ ListenableFuture<ListenableFuture<K>> compoundResult = getTaskContext().getRpcPool().submit(this);
+ return Futures.dereference(compoundResult);
}
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ *
+ */
+public class OFRpcTaskContext {
+
+ private IMessageDispatchService messageService;
+ private SessionContext session;
+ private NotificationProviderService rpcNotificationProviderService;
+ private long maxTimeout;
+ private TimeUnit maxTimeoutUnit;
+ private ListeningExecutorService rpcPool;
+
+ /**
+ * @return the messageService
+ */
+ public IMessageDispatchService getMessageService() {
+ return messageService;
+ }
+ /**
+ * @param messageService the messageService to set
+ */
+ public void setMessageService(IMessageDispatchService messageService) {
+ this.messageService = messageService;
+ }
+ /**
+ * @return the session
+ */
+ public SessionContext getSession() {
+ return session;
+ }
+ /**
+ * @param session the session to set
+ */
+ public void setSession(SessionContext session) {
+ this.session = session;
+ }
+ /**
+ * @return the rpcNotificationProviderService
+ */
+ public NotificationProviderService getRpcNotificationProviderService() {
+ return rpcNotificationProviderService;
+ }
+ /**
+ * @param rpcNotificationProviderService the rpcNotificationProviderService to set
+ */
+ public void setRpcNotificationProviderService(
+ NotificationProviderService rpcNotificationProviderService) {
+ this.rpcNotificationProviderService = rpcNotificationProviderService;
+ }
+ /**
+ * @return the maxTimeout
+ */
+ public long getMaxTimeout() {
+ return maxTimeout;
+ }
+ /**
+ * @param maxTimeout the maxTimeout to set
+ */
+ public void setMaxTimeout(long maxTimeout) {
+ this.maxTimeout = maxTimeout;
+ }
+ /**
+ * @return the maxTimeoutUnit
+ */
+ public TimeUnit getMaxTimeoutUnit() {
+ return maxTimeoutUnit;
+ }
+ /**
+ * @param maxTimeoutUnit the maxTimeoutUnit to set
+ */
+ public void setMaxTimeoutUnit(TimeUnit maxTimeoutUnit) {
+ this.maxTimeoutUnit = maxTimeoutUnit;
+ }
+ /**
+ * @param rpcPool
+ */
+ public void setRpcPool(ListeningExecutorService rpcPool) {
+ this.rpcPool = rpcPool;
+ }
+
+ /**
+ * @return the rpcPool
+ */
+ public ListeningExecutorService getRpcPool() {
+ return rpcPool;
+ }
+}
package org.opendaylight.openflowplugin.openflow.md.core.sal;
import java.math.BigInteger;
+import java.util.Collection;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.GroupConvertor;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.MeterConvertor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAdded;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdated;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev131103.TransactionId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.Group;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAdded;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterAddedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInputBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
/**
*
*/
public abstract class OFRpcTaskFactory {
/**
- * @param maxTimeout
- * @param maxTimeoutUnit
- * @param helper
+ * @param taskContext
+ * @param input
+ * @param cookie
* @return UpdateFlow task
*/
public static OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> createAddFlowTask(
- final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
- OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
- new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>() {
-
+ OFRpcTaskContext taskContext, AddFlowInput input,
+ SwitchConnectionDistinguisher cookie) {
+ OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task =
+ new OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
+
@Override
- public void run() {
- helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().isBarrier(), getCookie(), getResult());
- if (getResult().isDone()) {
- return;
- }
-
- // Convert the AddFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
- getVersion(), getSession().getFeatures().getDatapathId());
- Long xId = getSession().getNextXid();
- ofFlowModInput.setXid(xId);
-
- if (null != getRpcNotificationProviderService()) {
- FlowAddedBuilder newFlow = new FlowAddedBuilder(
- (org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow) getInput());
- newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- newFlow.setFlowRef(getInput().getFlowRef());
- getRpcNotificationProviderService().publish(newFlow.build());
+ public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = SettableFuture.create();
+
+ Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+ if (!barrierErrors.isEmpty()) {
+ OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
+ } else {
+ // Convert the AddFlowInput to FlowModInput
+ FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput(),
+ getVersion(), getSession().getFeatures().getDatapathId());
+ final Long xId = getSession().getNextXid();
+ ofFlowModInput.setXid(xId);
+
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+ result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
+ createFlowAddedNotification(xId, getInput()));
}
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
- getMessageService().flowMod(ofFlowModInput.build(), getCookie());
- OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+ return result;
}
};
+
return task;
}
/**
- * @param maxTimeout
- * @param maxTimeoutUnit
- * @param helper
+ * @param xId
+ * @return
+ */
+ protected static NotificationComposer<FlowAdded> createFlowAddedNotification(
+ final Long xId, final AddFlowInput input) {
+ return new NotificationComposer<FlowAdded>() {
+ @Override
+ public FlowAdded compose() {
+ FlowAddedBuilder newFlow = new FlowAddedBuilder((Flow) input);
+ newFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ newFlow.setFlowRef(input.getFlowRef());
+ return newFlow.build();
+ }
+ };
+ }
+
+ /**
+ * @param taskContext
+ * @param input
+ * @param cookie
* @return UpdateFlow task
*/
public static OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> createUpdateFlowTask(
- final long maxTimeout, final TimeUnit maxTimeoutUnit, final OFRpcTaskHelper helper) {
- OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
- new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>() {
+ OFRpcTaskContext taskContext, UpdateFlowInput input,
+ SwitchConnectionDistinguisher cookie) {
+
+ OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>> task =
+ new OFRpcTask<UpdateFlowInput, RpcResult<UpdateFlowOutput>>(taskContext, cookie, input) {
+
+ @Override
+ public ListenableFuture<RpcResult<UpdateFlowOutput>> call() {
+ ListenableFuture<RpcResult<UpdateFlowOutput>> result = null;
+ Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(),
+ getInput().getUpdatedFlow().isBarrier(), getCookie());
+ if (!barrierErrors.isEmpty()) {
+ OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateFlowOutput>>) result), barrierErrors);
+ } else {
+ // Convert the AddFlowInput to FlowModInput
+ FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(),
+ getVersion(), getSession().getFeatures().getDatapathId());
+ Long xId = getSession().getNextXid();
+ ofFlowModInput.setXid(xId);
+
+ Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
+ getMessageService().flowMod(ofFlowModInput.build(), getCookie());
+ result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
+ createFlowUpdatedNotification(xId, getInput()));
+ }
+ return result;
+ }
+ };
+ return task;
+ }
+ /**
+ * @param xId
+ * @param input
+ * @return
+ */
+ protected static NotificationComposer<FlowUpdated> createFlowUpdatedNotification(
+ final Long xId, final UpdateFlowInput input) {
+ return new NotificationComposer<FlowUpdated>() {
+ @Override
+ public FlowUpdated compose() {
+ FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(input.getUpdatedFlow());
+ updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ updFlow.setFlowRef(input.getFlowRef());
+ return updFlow.build();
+ }
+ };
+ }
+
+ /**
+ * @param taskContext
+ * @param input
+ * @param cookie
+ * @return update group task
+ */
+ public static OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> createAddGroupTask(
+ final OFRpcTaskContext taskContext, AddGroupInput input,
+ final SwitchConnectionDistinguisher cookie) {
+ OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>> task =
+ new OFRpcTask<AddGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
+
@Override
- public void run() {
- helper.rawBarrierSend(maxTimeout, maxTimeoutUnit, getInput().getUpdatedFlow().isBarrier(), getCookie(), getResult());
- if (getResult().isDone()) {
- return;
+ public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
+ ListenableFuture<RpcResult<UpdateGroupOutput>> result = SettableFuture.create();
+
+ Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+ if (!barrierErrors.isEmpty()) {
+ OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
+ } else {
+ // Convert the AddGroupInput to GroupModInput
+ GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(getInput(),
+ getVersion(), getSession().getFeatures().getDatapathId());
+ final Long xId = getSession().getNextXid();
+ ofGroupModInput.setXid(xId);
+
+ Future<RpcResult<UpdateGroupOutput>> resultFromOFLib = getMessageService()
+ .groupMod(ofGroupModInput.build(), getCookie());
+ result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
+ createGroupAddedNotification(xId, getInput()));
}
- // Convert the AddFlowInput to FlowModInput
- FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(getInput().getUpdatedFlow(),
- getVersion(), getSession().getFeatures().getDatapathId());
- Long xId = getSession().getNextXid();
- ofFlowModInput.setXid(xId);
+ return result;
+ }
+ };
+
+ return task;
+ }
+
- if (null != getRpcNotificationProviderService()) {
- FlowUpdatedBuilder updFlow = new FlowUpdatedBuilder(getInput().getUpdatedFlow());
- updFlow.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
- updFlow.setFlowRef(getInput().getFlowRef());
- getRpcNotificationProviderService().publish(updFlow.build());
+ /**
+ * @param xId
+ * @param input
+ * @return
+ */
+ protected static NotificationComposer<GroupAdded> createGroupAddedNotification(
+ final Long xId, final AddGroupInput input) {
+ return new NotificationComposer<GroupAdded>() {
+ @Override
+ public GroupAdded compose() {
+ GroupAddedBuilder groupMod = new GroupAddedBuilder((Group) input);
+ groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ groupMod.setGroupRef(input.getGroupRef());
+ return groupMod.build();
+ }
+ };
+ }
+
+ /**
+ * @param taskContext
+ * @param input
+ * @param cookie
+ * @return update meter task
+ */
+ public static OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> createAddMeterTask(
+ OFRpcTaskContext taskContext, AddMeterInput input,
+ SwitchConnectionDistinguisher cookie) {
+ OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>> task =
+ new OFRpcTask<AddMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
+
+ @Override
+ public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
+ ListenableFuture<RpcResult<UpdateMeterOutput>> result = SettableFuture.create();
+
+ Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(), getInput().isBarrier(), getCookie());
+ if (!barrierErrors.isEmpty()) {
+ OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
+ } else {
+ // Convert the AddGroupInput to GroupModInput
+ MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(getInput(), getVersion());
+ final Long xId = getSession().getNextXid();
+ ofMeterModInput.setXid(xId);
+
+ Future<RpcResult<UpdateMeterOutput>> resultFromOFLib = getMessageService()
+ .meterMod(ofMeterModInput.build(), getCookie());
+ result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
+ createMeterAddedNotification(xId, getInput()));
}
- Future<RpcResult<UpdateFlowOutput>> resultFromOFLib =
- getMessageService().flowMod(ofFlowModInput.build(), getCookie());
- OFRpcTaskHelper.chainFutures(resultFromOFLib, getResult());
+ return result;
}
};
+
return task;
+
+ }
+
+ /**
+ * @param xId
+ * @param input
+ * @return
+ */
+ protected static NotificationComposer<MeterAdded> createMeterAddedNotification(
+ final Long xId, final AddMeterInput input) {
+ return new NotificationComposer<MeterAdded>() {
+ @Override
+ public MeterAdded compose() {
+ MeterAddedBuilder meterMod = new MeterAddedBuilder((Meter) input);
+ meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ meterMod.setMeterRef(input.getMeterRef());
+ return meterMod.build();
+ }
+ };
+ }
+
+ /**
+ * @param taskContext
+ * @param input
+ * @param cookie
+ * @return UpdateFlow task
+ */
+ public static OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> createUpdateGroupTask(
+ OFRpcTaskContext taskContext, UpdateGroupInput input,
+ SwitchConnectionDistinguisher cookie) {
+ OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>> task =
+ new OFRpcTask<UpdateGroupInput, RpcResult<UpdateGroupOutput>>(taskContext, cookie, input) {
+
+ @Override
+ public ListenableFuture<RpcResult<UpdateGroupOutput>> call() {
+ ListenableFuture<RpcResult<UpdateGroupOutput>> result = null;
+ Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(),
+ getInput().getUpdatedGroup().isBarrier(), getCookie());
+ if (!barrierErrors.isEmpty()) {
+ OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateGroupOutput>>) result), barrierErrors);
+ } else {
+ // Convert the UpdateGroupInput to GroupModInput
+ GroupModInputBuilder ofGroupModInput = GroupConvertor.toGroupModInput(
+ getInput().getUpdatedGroup(), getVersion(),
+ getSession().getFeatures().getDatapathId());
+ final Long xId = getSession().getNextXid();
+ ofGroupModInput.setXid(xId);
+
+ Future<RpcResult<UpdateGroupOutput>> resultFromOFLib =
+ getMessageService().groupMod(ofGroupModInput.build(), getCookie());
+ result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
+ createGroupUpdatedNotification(xId, getInput()));
+ }
+ return result;
+ }
+ };
+ return task;
+ }
+
+ /**
+ * @param xId
+ * @param input
+ * @return
+ */
+ protected static NotificationComposer<GroupUpdated> createGroupUpdatedNotification(
+ final Long xId, final UpdateGroupInput input) {
+ return new NotificationComposer<GroupUpdated>() {
+ @Override
+ public GroupUpdated compose() {
+ GroupUpdatedBuilder groupMod = new GroupUpdatedBuilder(input.getUpdatedGroup());
+ groupMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ groupMod.setGroupRef(input.getGroupRef());
+ return groupMod.build();
+ }
+ };
}
+ /**
+ * @param taskContext
+ * @param input
+ * @param cookie
+ * @return update meter task
+ */
+ public static OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> createUpdateMeterTask(
+ OFRpcTaskContext taskContext, UpdateMeterInput input,
+ SwitchConnectionDistinguisher cookie) {
+ OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>> task =
+ new OFRpcTask<UpdateMeterInput, RpcResult<UpdateMeterOutput>>(taskContext, cookie, input) {
+
+ @Override
+ public ListenableFuture<RpcResult<UpdateMeterOutput>> call() {
+ ListenableFuture<RpcResult<UpdateMeterOutput>> result = null;
+ Collection<RpcError> barrierErrors = OFRpcTaskUtil.manageBarrier(getTaskContext(),
+ getInput().getUpdatedMeter().isBarrier(), getCookie());
+ if (!barrierErrors.isEmpty()) {
+ OFRpcTaskUtil.wrapBarrierErrors(((SettableFuture<RpcResult<UpdateMeterOutput>>) result), barrierErrors);
+ } else {
+ // Convert the UpdateMeterInput to MeterModInput
+ MeterModInputBuilder ofMeterModInput = MeterConvertor.toMeterModInput(
+ getInput().getUpdatedMeter(), getVersion());
+ final Long xId = getSession().getNextXid();
+ ofMeterModInput.setXid(xId);
+
+ Future<RpcResult<UpdateMeterOutput>> resultFromOFLib =
+ getMessageService().meterMod(ofMeterModInput.build(), getCookie());
+ result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+
+ OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
+ createMeterUpdatedNotification(xId, getInput()));
+ }
+ return result;
+ }
+ };
+ return task;
+ }
+
+ /**
+ * @param xId
+ * @param input
+ * @return
+ */
+ protected static NotificationComposer<MeterUpdated> createMeterUpdatedNotification(
+ final Long xId, final UpdateMeterInput input) {
+ return new NotificationComposer<MeterUpdated>() {
+ @Override
+ public MeterUpdated compose() {
+ MeterUpdatedBuilder meterMod = new MeterUpdatedBuilder(input.getUpdatedMeter());
+ meterMod.setTransactionId(new TransactionId(BigInteger.valueOf(xId.intValue())));
+ meterMod.setMeterRef(input.getMeterRef());
+ return meterMod.build();
+ }
+ };
+ }
}
+++ /dev/null
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.openflowplugin.openflow.md.core.sal;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
-import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-import com.google.common.base.Objects;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.SettableFuture;
-
-/**
- *
- */
-public class OFRpcTaskHelper {
-
- private IMessageDispatchService messageService;
- private SessionContext session;
- private NotificationProviderService rpcNotificationProviderService;
- /**
- * @param cookie
- * @param messageService
- * @param session
- * @param rpcNotificationProviderService
- */
- public OFRpcTaskHelper(IMessageDispatchService messageService, SessionContext session,
- NotificationProviderService rpcNotificationProviderService) {
- this.messageService = messageService;
- this.session = session;
- this.rpcNotificationProviderService = rpcNotificationProviderService;
- }
-
-
- /**
- * @param task
- * @param input
- * @param cookie
- * @return inited task
- */
- public <T, K> OFRpcTask<T, K> initTask(OFRpcTask<T, K> task, T input, SwitchConnectionDistinguisher cookie) {
- task.setMessageService(messageService);
- task.setSession(session);
- task.setRpcNotificationProviderService(rpcNotificationProviderService);
- task.setResult(SettableFuture.<K>create());
- task.setCookie(cookie);
- task.setInput(input);
- return task;
- }
-
- /**
- * @param intern
- * @param wrapper
- */
- public static <K> void chainFutures(final Future<K> intern, final SettableFuture<K> wrapper) {
- Futures.addCallback(
- JdkFutureAdapters.listenInPoolThread(intern),
- new FutureCallback<K>() {
-
- @Override
- public void onSuccess(
- K result) {
- wrapper.set(result);
- }
-
- @Override
- public void onFailure(Throwable t) {
- wrapper.setException(t);
- }
-
- });
- }
-
- /**
- * @param maxTimeout
- * @param maxTimeoutUnit
- * @param isBarrier
- * @param cookie
- * @param result
- */
- public <T> void rawBarrierSend(final long maxTimeout, final TimeUnit maxTimeoutUnit,
- Boolean isBarrier, SwitchConnectionDistinguisher cookie, SettableFuture<RpcResult<T>> result) {
- if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
- Future<RpcResult<BarrierOutput>> barrierFuture = ModelDrivenSwitchImpl.sendBarrier(cookie, session, messageService);
- try {
- RpcResult<BarrierOutput> barrierResult = barrierFuture.get(maxTimeout, maxTimeoutUnit);
- if (!barrierResult.isSuccessful()) {
- result.set(Rpcs.<T>getRpcResult(false, barrierResult.getErrors()));
- }
- } catch (Exception e) {
- result.setException(e);
- }
- }
- }
-}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core.sal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.openflowplugin.openflow.md.OFConstants;
+import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ *
+ */
+public abstract class OFRpcTaskUtil {
+
+ /**
+ * @param taskContext
+ * @param isBarrier
+ * @param cookie
+ * @return rpcResult of given type, containing wrapped errors of barrier sending (if any) or success
+ */
+ public static Collection<RpcError> manageBarrier(OFRpcTaskContext taskContext, Boolean isBarrier,
+ SwitchConnectionDistinguisher cookie) {
+ Collection<RpcError> errors = null;
+ if (Objects.firstNonNull(isBarrier, Boolean.FALSE)) {
+ Future<RpcResult<BarrierOutput>> barrierFuture = sendBarrier(taskContext.getSession(), cookie, taskContext.getMessageService());
+ try {
+ RpcResult<BarrierOutput> barrierResult = barrierFuture.get(
+ taskContext.getMaxTimeout(), taskContext.getMaxTimeoutUnit());
+ if (!barrierResult.isSuccessful()) {
+ errors = barrierResult.getErrors();
+ }
+ } catch (Exception e) {
+ RpcError rpcError = RpcErrors.getRpcError(
+ OFConstants.APPLICATION_TAG, OFConstants.ERROR_TAG_TIMEOUT,
+ "barrier sending failed", ErrorSeverity.WARNING,
+ "switch failed to respond on barrier request - message ordering is not preserved", ErrorType.RPC, e);
+ errors = Lists.newArrayList(rpcError);
+ }
+ }
+
+ if (errors == null) {
+ errors = Collections.emptyList();
+ }
+
+ return errors;
+ }
+
+ /**
+ * @param session
+ * @param cookie
+ * @param messageService
+ * @return barrier response
+ */
+ private static Future<RpcResult<BarrierOutput>> sendBarrier(SessionContext session,
+ SwitchConnectionDistinguisher cookie, IMessageDispatchService messageService) {
+ BarrierInput barrierInput = MessageFactory.createBarrier(
+ session.getFeatures().getVersion(), session.getNextXid());
+ return messageService.barrier(barrierInput, cookie);
+ }
+
+ /**
+ * @param result rpcResult with success = false, errors = given collection
+ * @param barrierErrors
+ */
+ public static <T> void wrapBarrierErrors(SettableFuture<RpcResult<T>> result,
+ Collection<RpcError> barrierErrors) {
+ result.set(Rpcs.<T>getRpcResult(false, barrierErrors));
+ }
+
+ /**
+ * @param originalResult
+ * @param notificationProviderService
+ * @param notificationComposer lazy notification composer
+ */
+ public static <R, N extends Notification> void hookFutureNotification(ListenableFuture<R> originalResult,
+ final NotificationProviderService notificationProviderService,
+ final NotificationComposer<N> notificationComposer) {
+ Futures.addCallback(originalResult, new FutureCallback<R>() {
+ @Override
+ public void onSuccess(R result) {
+ if (null != notificationProviderService) {
+ notificationProviderService.publish(notificationComposer.compose());
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ //NOOP
+ }
+ });
+
+ }
+
+}
package org.opendaylight.openflowplugin.openflow.md.core.session;
import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.Future;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.service.rev131107.UpdatePortOutputBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* message dispatch service to send the message to switch.
}
@Override
- public Future<RpcResult<UpdateFlowOutput>> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) {
+ public Future<RpcResult<UpdateFlowOutput>> flowMod(final FlowModInput input, SwitchConnectionDistinguisher cookie) {
LOG.debug("Calling OFLibrary flowMod");
Future<RpcResult<Void>> response = getConnectionAdapter(cookie).flowMod(input);
- // Send the same Xid back to caller - MessageDrivenSwitch
- UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
- BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ;
- flowModOutput.setTransactionId(new TransactionId(bigIntXid));
-
- UpdateFlowOutput result = flowModOutput.build();
- Collection<RpcError> errors = Collections.emptyList();
- RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- // solution 1: sending directly and hooking listener to get error
- // hookup listener to catch the possible error with no reference to returned future-object
- LOG.debug("Returning to ModelDrivenSwitch for flowMod RPC");
- return Futures.immediateFuture(rpcResult);
-
+ // appending xid
+ ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(response),
+ new Function<RpcResult<Void>,RpcResult<UpdateFlowOutput>>() {
+
+ @Override
+ public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) {
+ UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder();
+ BigInteger bigIntXid = BigInteger.valueOf(input.getXid()) ;
+ flowModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+ UpdateFlowOutput result = flowModOutput.build();
+ RpcResult<UpdateFlowOutput> rpcResult = Rpcs.getRpcResult(
+ inputArg.isSuccessful(), result, inputArg.getErrors());
+ return rpcResult;
+ }
+ });
+
+ return xidResult;
}
@Override
}
@Override
- public Future<RpcResult<UpdateGroupOutput>> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) {
+ public Future<RpcResult<UpdateGroupOutput>> groupMod(final GroupModInput input, SwitchConnectionDistinguisher cookie) {
LOG.debug("Calling OFLibrary groupMod");
Future<RpcResult<Void>> response = getConnectionAdapter(cookie).groupMod(input);
- // Send the same Xid back to caller - MessageDrivenSwitch
- UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
- BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
- groupModOutput.setTransactionId(new TransactionId(bigIntXid));
-
- UpdateGroupOutput result = groupModOutput.build();
- Collection<RpcError> errors = Collections.emptyList();
- RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- // solution 1: sending directly and hooking listener to get error
- // hookup listener to catch the possible error with no reference to returned future-object
- LOG.debug("Returning to ModelDrivenSwitch for groupMod RPC");
- return Futures.immediateFuture(rpcResult);
-
+ // appending xid
+ ListenableFuture<RpcResult<UpdateGroupOutput>> xidResult = Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(response),
+ new Function<RpcResult<Void>,RpcResult<UpdateGroupOutput>>() {
+
+ @Override
+ public RpcResult<UpdateGroupOutput> apply(final RpcResult<Void> inputArg) {
+ UpdateGroupOutputBuilder groupModOutput = new UpdateGroupOutputBuilder();
+ BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+ groupModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+ UpdateGroupOutput result = groupModOutput.build();
+ RpcResult<UpdateGroupOutput> rpcResult = Rpcs.getRpcResult(
+ inputArg.isSuccessful(), result, inputArg.getErrors());
+ return rpcResult;
+ }
+ });
+
+ return xidResult;
}
@Override
- public Future<RpcResult<UpdateMeterOutput>> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) {
+ public Future<RpcResult<UpdateMeterOutput>> meterMod(final MeterModInput input, SwitchConnectionDistinguisher cookie) {
LOG.debug("Calling OFLibrary meterMod");
Future<RpcResult<Void>> response = getConnectionAdapter(cookie).meterMod(input);
- // Send the same Xid back to caller - MessageDrivenSwitch
- UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
- BigInteger bigIntXid =BigInteger.valueOf(input.getXid());
- meterModOutput.setTransactionId(new TransactionId(bigIntXid));
+ // appending xid
+ ListenableFuture<RpcResult<UpdateMeterOutput>> xidResult = Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(response),
+ new Function<RpcResult<Void>,RpcResult<UpdateMeterOutput>>() {
+
+ @Override
+ public RpcResult<UpdateMeterOutput> apply(final RpcResult<Void> inputArg) {
+ UpdateMeterOutputBuilder meterModOutput = new UpdateMeterOutputBuilder();
+ BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+ meterModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+ UpdateMeterOutput result = meterModOutput.build();
+ RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(
+ inputArg.isSuccessful(), result, inputArg.getErrors());
+ return rpcResult;
+ }
+ });
- UpdateMeterOutput result = meterModOutput.build();
- Collection<RpcError> errors = Collections.emptyList();
- RpcResult<UpdateMeterOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- // solution 1: sending directly and hooking listener to get error
- // hookup listener to catch the possible error with no reference to returned future-object
- LOG.debug("Returning to ModelDrivenSwitch for meterMod RPC");
- return Futures.immediateFuture(rpcResult);
-
+ return xidResult;
}
@Override
}
@Override
- public Future<RpcResult<UpdatePortOutput>> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) {
-
+ public Future<RpcResult<UpdatePortOutput>> portMod(final PortModInput input, SwitchConnectionDistinguisher cookie) {
LOG.debug("Calling OFLibrary portMod");
Future<RpcResult<Void>> response = getConnectionAdapter(cookie).portMod(input);
-
- // Send the same Xid back to caller - ModelDrivenSwitch
- UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
- String stringXid =input.getXid().toString();
- BigInteger bigIntXid = new BigInteger( stringXid );
- portModOutput.setTransactionId(new TransactionId(bigIntXid));
- UpdatePortOutput result = portModOutput.build();
- Collection<RpcError> errors = Collections.emptyList();
- RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(true, result, errors);
-
- LOG.debug("Returning to ModelDrivenSwitch for portMod RPC");
- return Futures.immediateFuture(rpcResult);
+
+ // appending xid
+ ListenableFuture<RpcResult<UpdatePortOutput>> xidResult = Futures.transform(
+ JdkFutureAdapters.listenInPoolThread(response),
+ new Function<RpcResult<Void>,RpcResult<UpdatePortOutput>>() {
+
+ @Override
+ public RpcResult<UpdatePortOutput> apply(final RpcResult<Void> inputArg) {
+ UpdatePortOutputBuilder portModOutput = new UpdatePortOutputBuilder();
+ BigInteger bigIntXid = BigInteger.valueOf(input.getXid());
+ portModOutput.setTransactionId(new TransactionId(bigIntXid));
+
+ UpdatePortOutput result = portModOutput.build();
+ RpcResult<UpdatePortOutput> rpcResult = Rpcs.getRpcResult(
+ inputArg.isSuccessful(), result, inputArg.getErrors());
+ return rpcResult;
+ }
+ });
+
+ return xidResult;
}
@Override
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-
-import java.util.concurrent.TimeUnit;
-
import org.opendaylight.openflowplugin.openflow.md.ModelDrivenSwitch;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
/**
* @author mirehak
*/
/**
* @param newFixedThreadPool
*/
- void setRpcPool(ExecutorService newFixedThreadPool);
+ void setRpcPool(ListeningExecutorService newFixedThreadPool);
/**
* @return the rpcPool instance
*/
- ExecutorService getRpcPool();
+ ListeningExecutorService getRpcPool();
}
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
/**
* @author mirehak
*/
private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping;
-
protected ListenerRegistry<SessionListener> sessionListeners;
private NotificationProviderService notificationProviderService;
private DataProviderService dataProviderService;
+ private ListeningExecutorService rpcPool;
+
/**
* @return singleton instance
}
}
};
- private ExecutorService rpcPool;
-
+
@Override
public Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> getTranslatorMapping() {
}
@Override
- public void setRpcPool(ExecutorService rpcPool) {
+ public void setRpcPool(ListeningExecutorService rpcPool) {
this.rpcPool = rpcPool;
}
@Override
- public ExecutorService getRpcPool() {
+ public ListeningExecutorService getRpcPool() {
return rpcPool;
}
}
+++ /dev/null
-package org.opendaylight.openflowplugin.openflow.md.core.session;
-
-public class TransactionKey {
-
- private static final long serialVersionUID = 7805731164917659700L;
- final private Long _xId;
-
- public TransactionKey(Long transactionId) {
- this._xId = transactionId;
- }
-
- public Long getXId() {
- return _xId;
- }
-
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((_xId == null) ? 0 : _xId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(java.lang.Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- TransactionKey other = (TransactionKey) obj;
- if (_xId == null) {
- if (other._xId != null) {
- return false;
- }
- } else if (!_xId.equals(other._xId)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("TransactionId [_xId=");
- builder.append(_xId);
- builder.append("]");
- return builder.toString();
- }
-
-}
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.aggregate._case.MultipartRequestAggregateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.request.multipart.request.body.multipart.request.flow._case.MultipartRequestFlowBuilder;
-public class FlowCreatorUtil {
+public abstract class FlowCreatorUtil {
public static void setWildcardedFlowMatch(short version,MultipartRequestFlowBuilder flowBuilder){
if(version == OFConstants.OFP_VERSION_1_0){
import java.math.BigInteger;
import java.util.List;
-public class InventoryDataServiceUtil {
+public abstract class InventoryDataServiceUtil {
private final static Logger LOG = LoggerFactory.getLogger(InventoryDataServiceUtil.class);
public final static String OF_URI_PREFIX = "openflow:";
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortStateV10;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
-public class PortTranslatorUtil {
+public abstract class PortTranslatorUtil {
public static org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortFeatures translatePortFeatures(PortFeatures apf) {
org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortFeatures napf = null;
if(apf != null){
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.sal.common.util.Futures;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.openflowplugin.openflow.md.OFConstants;
import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.core.session.TransactionKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowInputBuilder;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* simple NPE smoke test
Mockito.when(context.getFeatures()).thenReturn(features);
Mockito.when(features.getDatapathId()).thenReturn(BigInteger.valueOf(1));
- OFSessionUtil.getSessionManager().setRpcPool(Executors.newFixedThreadPool(10));
+ OFSessionUtil.getSessionManager().setRpcPool(MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)));
mdSwitchOF10 = new ModelDrivenSwitchImpl(null, null, context);
mdSwitchOF13 = new ModelDrivenSwitchImpl(null, null, context);
*/
package org.opendaylight.openflowplugin.openflow.md.core.sal;
-import junit.framework.Assert;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;