*/
package org.opendaylight.openflowplugin.impl.services;
+import com.google.common.base.Function;
+
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTaskFactory;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.FlowConvertor;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.OriginalFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.module.config.rev141015.SetConfigOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
public class SalFlowServiceImpl extends CommonService implements SalFlowService {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(SalFlowServiceImpl.class);
- // TODO set cookie somehow from - DeviceContext probably (temporary set to 0 - primary connection)
- private final BigInteger connectionID = PRIMARY_CONNECTION;
-
- private interface Function {
- Future<RpcResult<Void>> apply(final BigInteger IDConnection);
- }
-
public SalFlowServiceImpl(final RpcContext rpcContext) {
super(rpcContext);
}
@Override
public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) {
- return processFlow(new Function() {
- @Override
- public ListenableFuture<RpcResult<Void>> apply(final BigInteger IDConnection) {
- final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version,
- datapathId);
- return chainFlowMods(ofFlowModInputs, 0, IDConnection);
- }
- });
+ return this.<AddFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
+ new Function<BigInteger, Future<RpcResult<Void>>>() {
+ @Override
+ public ListenableFuture<RpcResult<Void>> apply(final BigInteger IDConnection) {
+ final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version,
+ datapathId);
+ return chainFlowMods(ofFlowModInputs, 0, IDConnection);
+ }
+ });
}
@Override
public Future<RpcResult<RemoveFlowOutput>> removeFlow(final RemoveFlowInput input) {
- return processFlow(new Function() {
- @Override
- public Future<RpcResult<Void>> apply(final BigInteger IDConnection) {
- final List<FlowModInputBuilder> ofFlowModInputs = FlowConvertor.toFlowModInputs(input, version,
- datapathId);
- return provideConnectionAdapter(IDConnection).flowMod(ofFlowModInputs.get(0).build());
- }
- });
+ return this.<RemoveFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
+ new Function<BigInteger, Future<RpcResult<Void>>>() {
+ @Override
+ public Future<RpcResult<Void>> apply(final BigInteger IDConnection) {
+ final FlowModInputBuilder ofFlowModInput = FlowConvertor.toFlowModInput(input, version,
+ datapathId);
+ return createResultForFlowMod(ofFlowModInput, IDConnection);
+ }
+ });
}
@Override
allFlowMods.addAll(ofFlowModInputs);
LOG.debug("Number of flows to push to switch: {}", allFlowMods.size());
Collections.<String> emptyList();
- return this.<UpdateFlowOutput> processFlow(new Function() {
- @Override
- public Future<RpcResult<Void>> apply(final BigInteger cookie) {
- return chainFlowMods(allFlowMods, 0, cookie);
- }
- });
- }
-
- private <T extends DataObject> Future<RpcResult<T>> processFlow(final Function function) {
- LOG.debug("Calling the FlowMod RPC method on MessageDispatchService");
- // use primary connection
-
- final RequestContext requestContext = rpcContext.createRequestContext();
- final SettableFuture<RpcResult<T>> result = rpcContext.storeOrFail(requestContext);
-
- if (!result.isDone()) {
- final Future<RpcResult<Void>> resultFromOFLib = function.apply(connectionID);
-
- RpcResultConvertor<T> rpcResultConvertor = new RpcResultConvertor<>(requestContext);
- rpcResultConvertor.processResultFromOfJava(resultFromOFLib, getWaitTime());
-
- } else {
- requestContext.close();
- }
- return result;
+ return this.<UpdateFlowOutput, Void> handleServiceCall(PRIMARY_CONNECTION,
+ new Function<BigInteger, Future<RpcResult<Void>>>() {
+ @Override
+ public Future<RpcResult<Void>> apply(final BigInteger cookie) {
+ return chainFlowMods(allFlowMods, 0, cookie);
+ }
+ });
}
/**