*/
ConnectionAdapter getConnectionAdapter();
+ /**
+ * Returns reference to OFJava outbound queue provider. Outbound queue is used for outbound messages processing.
+ *
+ * @return
+ */
+ OutboundQueueProvider getOutboundQueueProvider();
+ /**
+ * Method sets reference to OFJava outbound queue provider.
+ *
+ */
+ void setOutboundQueueProvider(OutboundQueueProvider outboundQueueProvider);
/**
* Method returns current connection state.
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.connection;
+
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+
+/**
+ * Created by Martin Bobak <mbobak@cisco.com> on 12.5.2015.
+ */
+public interface OutboundQueueProvider extends OutboundQueueHandler {
+
+ OutboundQueue getOutboundQueue();
+}
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
private NodeId nodeId;
private DeviceDisconnectedHandler deviceDisconnectedHandler;
private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class);
+ private OutboundQueueProvider outboundQueueProvider;
/**
* @param connectionAdapter
return connectionAdapter;
}
+ @Override
+ public OutboundQueueProvider getOutboundQueueProvider() {
+ return this.outboundQueueProvider;
+ }
+
+ @Override
+ public void setOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider) {
+ this.outboundQueueProvider = outboundQueueProvider;
+ }
+
@Override
public CONNECTION_STATE getConnectionState() {
return connectionState;
import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
@Override
public void onSwitchConnected(final ConnectionAdapter connectionAdapter) {
+
LOG.trace("preparing handshake: {}", connectionAdapter.getRemoteAddress());
final int handshakeThreadLimit = 1; //TODO: move to constants/parametrize
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.connection;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
+
+/**
+ * Created by Martin Bobak <mbobak@cisco.com> on 12.5.2015.
+ */
+public class OutboundQueueProviderImpl implements OutboundQueueProvider {
+
+ private OutboundQueue outboundQueue;
+ private final short ofVersion;
+
+ public OutboundQueueProviderImpl(final short ofVersion) {
+ this.ofVersion = ofVersion;
+ }
+
+ @Nonnull
+ @Override
+ public BarrierInput createBarrierRequest(@Nonnull final Long xid) {
+ final BarrierInputBuilder biBuilder = new BarrierInputBuilder();
+ biBuilder.setVersion(ofVersion);
+ biBuilder.setXid(xid);
+ return biBuilder.build();
+
+ }
+
+ @Override
+ public void onConnectionQueueChanged(final OutboundQueue outboundQueue) {
+ this.outboundQueue = outboundQueue;
+ }
+
+ @Override
+ public OutboundQueue getOutboundQueue() {
+ return this.outboundQueue;
+ }
+}
+++ /dev/null
-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.device;
-
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.OutstandingMessageExtractor;
-import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * openflowplugin-impl
- * org.opendaylight.openflowplugin.impl.device
- *
- *
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Apr 3, 2015
- */
-public class BarrierProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(BarrierProcessor.class);
-
- /**
- * for all requestContexts from deviceContext cache which are older than barrier (lower barrierXid value) we do: <br>
- * <ul>
- * <li>remove from cache</li>
- * <li>cancel inner future</li>
- * </ul>
- *
- * @param barrierXid
- * @param messageExtractor
- */
- public static void processOutstandingRequests(final long barrierXid, final OutstandingMessageExtractor messageExtractor) {
- LOG.trace("processing barrier response [{}]", barrierXid);
- RequestContext nextRequestContext;
- while ((nextRequestContext = messageExtractor.extractNextOutstandingMessage(barrierXid)) != null ) {
- LOG.trace("flushing outstanding request [{}], closing", nextRequestContext.getXid().getValue());
- nextRequestContext.getFuture().cancel(false);
- RequestContextUtil.closeRequstContext(nextRequestContext);
- }
- }
-}
+++ /dev/null
-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.device;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * openflowplugin-impl
- * org.opendaylight.openflowplugin.impl.device
- * Barrier message self restarting builder.
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- * Created: Apr 3, 2015
- */
-public class BarrierTaskBuilder {
-
- private static final Logger LOG = LoggerFactory.getLogger(BarrierTaskBuilder.class);
- public static final long DELAY = 500L;
-
- private final DeviceContext deviceCtx;
-
- public BarrierTaskBuilder(final DeviceContext deviceCtx) {
- this.deviceCtx = Preconditions.checkNotNull(deviceCtx);
- Preconditions.checkNotNull(deviceCtx.getTimer());
- }
-
- public void buildAndFireBarrierTask() {
- Timeout timeout = deviceCtx.getTimer().newTimeout(new BarrierTask(deviceCtx), DELAY, TimeUnit.MILLISECONDS);
- deviceCtx.setCurrentBarrierTimeout(timeout);
- }
-
-
- private final class BarrierTask implements TimerTask {
-
- private final DeviceContext deviceCtx;
-
- public BarrierTask(final DeviceContext deviceCtx) {
- this.deviceCtx = deviceCtx;
- }
-
- /**
- * @return OF-message, ready to send
- */
- private BarrierInput makeBarrier() {
- final BarrierInputBuilder biBuilder = new BarrierInputBuilder();
- biBuilder.setVersion(deviceCtx.getDeviceState().getVersion());
- biBuilder.setXid(deviceCtx.getNextXid().getValue());
- return biBuilder.build();
- }
-
- @Override
- public void run(final Timeout timeout) throws Exception {
- // check outstanding requests first
- if (deviceCtx.getDeviceState().isValid()) {
- if (deviceCtx.getNumberOfOutstandingRequests() > 0) {
- BarrierInput barrierInput = makeBarrier();
- LOG.trace("sending out barrier [{}]", barrierInput.getXid());
- final Future<RpcResult<BarrierOutput>> future = deviceCtx.getPrimaryConnectionContext()
- .getConnectionAdapter().barrier(barrierInput);
- final ListenableFuture<RpcResult<BarrierOutput>> lsFuture = JdkFutureAdapters.listenInPoolThread(future);
- Futures.addCallback(lsFuture, makeCallBack());
- } else {
- // if no requests
- buildAndFireBarrierTask();
- }
- } else {
- LOG.trace("DeviceContext is not valid, will not create next barrier task.");
- }
- }
-
- private FutureCallback<RpcResult<BarrierOutput>> makeCallBack() {
- return new FutureCallback<RpcResult<BarrierOutput>>() {
- @Override
- public void onSuccess(final RpcResult<BarrierOutput> result) {
- if (!result.isSuccessful()) {
- for (RpcError rpcError : result.getErrors()) {
- LOG.trace("Barrier response with error {}", rpcError, rpcError.getCause());
- }
- } else if (null != result.getResult().getXid()) {
- BarrierProcessor.processOutstandingRequests(result.getResult().getXid(), deviceCtx);
- }
- buildAndFireBarrierTask();
- }
-
- @Override
- public void onFailure(final Throwable t) {
- LOG.info("Barrier has failed {} ", t.getMessage());
- LOG.trace("Barrier has failed", t);
- }
- };
- }
-
- }
-
-}
private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
private NotificationPublishService notificationPublishService;
private final ThrottledNotificationsOfferer throttledConnectionsHolder;
- private BlockingQueue<PacketInMessage> bumperQueue;
+ private BlockingQueue<PacketReceived> bumperQueue;
@VisibleForTesting
deviceMeterRegistry = new DeviceMeterRegistryImpl();
messageSpy = _messageSpy;
this.throttledConnectionsHolder = throttledConnectionsHolder;
- bumperQueue = new ArrayBlockingQueue<PacketInMessage>(5000);
+ bumperQueue = new ArrayBlockingQueue<>(5000);
}
/**
messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
}
+ unhookRequestCtx(xid);
try {
requestContext.close();
} catch (final Exception e) {
}
if (throttledConnectionsHolder.isThrottlingEffective(bumperQueue)) {
- boolean caught = bumperQueue.offer(packetInMessage);
+ boolean caught = bumperQueue.offer(packetReceived);
if (!caught) {
LOG.debug("ingress notification dropped - no place in bumper queue [{}]", connectionAdapter.getRemoteAddress());
}
LOG.trace("notification offer interrupted..", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof NotificationRejectedException) {
- applyThrottling(packetInMessage, connectionAdapter);
+ applyThrottling(packetReceived, connectionAdapter);
} else {
LOG.debug("notification offer failed: {}", e.getMessage());
LOG.trace("notification offer failed..", e);
}
}
- private void applyThrottling(PacketInMessage packetInMessage, final ConnectionAdapter connectionAdapter) {
+ private void applyThrottling(PacketReceived packetReceived, final ConnectionAdapter connectionAdapter) {
final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
LOG.debug("Notification offer refused by notification service.");
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
connectionAdapter.setAutoRead(false);
LOG.debug("Throttling ingress for {}", remoteAddress);
final ListenableFuture<Void> queueDone;
// adding first notification
- bumperQueue.offer(packetInMessage);
+ bumperQueue.offer(packetReceived);
synchronized (bumperQueue) {
queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue);
}
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
+import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
import org.opendaylight.openflowplugin.impl.connection.ThrottledNotificationsOffererImpl;
import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
private final List<DeviceContext> deviceContexts = new ArrayList<DeviceContext>();
private final MessageIntelligenceAgency messageIntelligenceAgency;
+ private final long barrierNanos = 500000000L;
+ private final int maxQueueDepth = 1024;
+
public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
@Nonnull final MessageIntelligenceAgency messageIntelligenceAgency) {
this.dataBroker = Preconditions.checkNotNull(dataBroker);
// final phase - we have to add new Device to MD-SAL DataStore
Preconditions.checkNotNull(deviceContext);
((DeviceContextImpl) deviceContext).submitTransaction();
- new BarrierTaskBuilder(deviceContext).buildAndFireBarrierTask();
}
@Override
final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture;
final Short version = connectionContext.getFeatures().getVersion();
+
+ OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
+ connectionContext.getConnectionAdapter().registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+ connectionContext.setOutboundQueueProvider(outboundQueueProvider);
+
if (OFConstants.OFP_VERSION_1_0 == version) {
final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10();
private ListenableFuture<RpcResult<List<MultipartReply>>> getNodeStaticInfo(final MultiMsgCollector multiMsgCollector, final MultipartType type, final DeviceContext deviceContext,
final InstanceIdentifier<Node> nodeII, final short version) {
- final Xid xid = deviceContext.getNextXid();
+ final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+
+ long reservedXid;
+ synchronized (outboundQueue) {
+ reservedXid = outboundQueue.reserveEntry();
+ }
+ final Xid xid = new Xid(reservedXid);
+
final RequestContext<List<MultipartReply>> requestContext = emptyRequestContextStack.createRequestContext();
requestContext.setXid(xid);
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+ final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture = requestContext.getFuture();
multiMsgCollector.registerMultipartXid(xid.getValue());
- Futures.addCallback(requestContext.getFuture(), new FutureCallback<RpcResult<List<MultipartReply>>>() {
+ outboundQueue.commitEntry(reservedXid, MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback<OfHeader>() {
+ @Override
+ public void onSuccess(final OfHeader ofHeader) {
+ LOG.info("Static node {} info: {} collected", type);
+ }
+
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.info("Failed to retrieve static node {} info: {}", type, t.getMessage());
+ }
+ });
+
+ Futures.addCallback(requestContextFuture, new FutureCallback<RpcResult<List<MultipartReply>>>() {
@Override
public void onSuccess(final RpcResult<List<MultipartReply>> rpcResult) {
final List<MultipartReply> result = rpcResult.getResult();
final Iterator<RpcError> rpcErrorIterator = rpcResult.getErrors().iterator();
while (rpcErrorIterator.hasNext()) {
final RpcError rpcError = rpcErrorIterator.next();
- LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage());
+ LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage());
if (null != rpcError.getCause()) {
LOG.trace("Detailed error:", rpcError.getCause());
}
}
@Override
- public void onFailure(final Throwable t) {
- LOG.info("Failed to retrieve static node {} info: {}", type, t.getMessage());
+ public void onFailure(final Throwable throwable) {
+
}
});
+
+/*
final ListenableFuture<RpcResult<Void>> rpcFuture = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter()
.multipartRequest(MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type)));
final OFJResult2RequestCtxFuture OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture(requestContext, deviceContext);
OFJResult2RequestCtxFuture.processResultFromOfJava(rpcFuture);
+*/
return requestContext.getFuture();
}
import java.math.BigInteger;
import java.util.concurrent.Future;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
this.primaryConnectionAdapter = deviceContext.getPrimaryConnectionContext().getConnectionAdapter();
this.messageSpy = deviceContext.getMessageSpy();
}
+
public static BigInteger getPrimaryConnection() {
return PRIMARY_CONNECTION;
}
- public short getVersion(){
+ public short getVersion() {
return version;
}
public <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
DataCrateBuilder<T> dataCrateBuilder = DataCrateBuilder.<T>builder();
- return handleServiceCall(connectionID, function, dataCrateBuilder);
+ return handleServiceCall(function, dataCrateBuilder);
}
+
public <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
DataCrateBuilder<T> dataCrateBuilder = DataCrateBuilder.<T>builder();
- return handleServiceCall(PRIMARY_CONNECTION, function, dataCrateBuilder);
+ return handleServiceCall(function, dataCrateBuilder);
}
/**
* @param <T>
* @param <F>
- * @param connectionID
* @param function
* @param dataCrateBuilder predefined data
* @return
*/
- public final <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
- final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function,
+ public final <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function,
final DataCrateBuilder<T> dataCrateBuilder) {
LOG.trace("Handling general service call");
final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
if (result.isDone()) {
messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
+ LOG.trace("Request context refused.");
return result;
}
- DataCrate<T> dataCrate = dataCrateBuilder.setiDConnection(connectionID).setRequestContext(requestContext)
+
+ long reservedXid;
+ final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+ synchronized (outboundQueue) {
+ reservedXid = outboundQueue.reserveEntry();
+ }
+ final Xid xid = new Xid(reservedXid);
+ requestContext.setXid(xid);
+ DataCrate<T> dataCrate = dataCrateBuilder.setRequestContext(requestContext)
.build();
final ListenableFuture<RpcResult<F>> resultFromOFLib;
- requestContext.setXid(deviceContext.getNextXid());
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+ deviceContext.hookRequestCtx(xid, requestContext);
messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
synchronized (deviceContext) {
import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
return future;
}
- private <T> ListenableFuture<RpcResult<T>> processFlowModInputBuilders(
- final List<FlowModInputBuilder> ofFlowModInputs) {
+ private <T> ListenableFuture<RpcResult<T>> processFlowModInputBuilders(final List<FlowModInputBuilder> ofFlowModInputs) {
+
final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
DataCrateBuilder<T> dataCrateBuilder = DataCrateBuilder.<T>builder().setFlowModInputBuilder(flowModInputBuilder);
ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(
- getPrimaryConnection(),
new Function<DataCrate<T>, ListenableFuture<RpcResult<Void>>>() {
@Override
public ListenableFuture<RpcResult<Void>> apply(final DataCrate<T> data) {
}
protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data, final FlowModInputBuilder flowModInputBuilder) {
- final Xid xid = data.getRequestContext().getXid();
- flowModInputBuilder.setXid(xid.getValue());
+ final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+ long xid = data.getRequestContext().getXid().getValue();
+ flowModInputBuilder.setXid(xid);
final FlowModInput flowModInput = flowModInputBuilder.build();
- Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(data.getiDConnection()).flowMod(
- flowModInput);
- final ListenableFuture<RpcResult<Void>> result = JdkFutureAdapters.listenInPoolThread(flowModResult);
- return result;
+ final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+ outboundQueue.commitEntry(xid, flowModInput, new FutureCallback<OfHeader>() {
+ @Override
+ public void onSuccess(final OfHeader ofHeader) {
+ settableFuture.set(RpcResultBuilder.<Void>success().build());
+ getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage());
+ settableFuture.set(rpcResultBuilder.build());
+ getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+ }
+ });
+ return settableFuture;
}
}
package org.opendaylight.openflowplugin.impl.statistics.services.dedicated;
import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
makeMultipartRequestInput(xid.getValue(),
getVersion(),
type);
- final Future<RpcResult<Void>> resultFromOFLib = deviceContext.getPrimaryConnectionContext()
- .getConnectionAdapter().multipartRequest(multipartRequestInput);
- return JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+ final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+ final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+ synchronized (outboundQueue){
+ outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
+ @Override
+ public void onSuccess(final OfHeader ofHeader) {
+ settableFuture.set(RpcResultBuilder.<Void>success().build());
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
+ settableFuture.set(rpcResultBuilder.build());
+ }
+ });
+ }
+ return settableFuture;
}
}
*/
package org.opendaylight.openflowplugin.impl.util;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
-import org.opendaylight.openflowplugin.impl.services.FlowCapableTransactionServiceImpl;
-
-import org.opendaylight.openflowplugin.impl.services.SalEchoServiceImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.impl.services.FlowCapableTransactionServiceImpl;
import org.opendaylight.openflowplugin.impl.services.NodeConfigServiceImpl;
import org.opendaylight.openflowplugin.impl.services.PacketProcessingServiceImpl;
+import org.opendaylight.openflowplugin.impl.services.SalEchoServiceImpl;
import org.opendaylight.openflowplugin.impl.services.SalFlowServiceImpl;
import org.opendaylight.openflowplugin.impl.services.SalGroupServiceImpl;
import org.opendaylight.openflowplugin.impl.services.SalMeterServiceImpl;
import org.opendaylight.openflowplugin.impl.services.SalTableServiceImpl;
import org.opendaylight.openflowplugin.impl.statistics.services.OpendaylightFlowStatisticsServiceImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.module.config.rev141015.NodeConfigService;
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.device;
-
-import com.google.common.util.concurrent.SettableFuture;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.OutstandingMessageExtractor;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-/**
- * Created by mirehak on 4/5/15.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class BarrierProcessorTest {
-
- private static final long XID = 42L;
- @Mock
- private OutstandingMessageExtractor messageExtractor;
- @Mock
- private RequestContext<String> extractedReqCtx;
-
- private SettableFuture<RpcResult<String>> settableFuture;
-
- @Before
- public void setUp() throws Exception {
- settableFuture = SettableFuture.create();
- Mockito.when(messageExtractor.extractNextOutstandingMessage(Matchers.anyLong()))
- .thenReturn(extractedReqCtx, extractedReqCtx, null);
- Mockito.when(extractedReqCtx.getFuture()).thenReturn(settableFuture);
- Mockito.when(extractedReqCtx.getXid()).thenReturn(new Xid(41L), new Xid(42L));
- }
-
- @After
- public void tearDown() throws Exception {
- Mockito.verifyNoMoreInteractions(messageExtractor);
- }
-
- @Test
- public void testProcessOutstandingRequests() throws Exception {
- BarrierProcessor.processOutstandingRequests(XID, messageExtractor);
-
- Mockito.verify(messageExtractor, Mockito.times(3)).extractNextOutstandingMessage(XID);
- Assert.assertTrue(settableFuture.isCancelled());
- }
-}
\ No newline at end of file