All necessary work is done in callbacks registered when commiting entry in outbound queue.
That makes registration of req. context inside device context worthless. Collection of
multiparts is done in message collector on request context registered when entry is commited.
Request contexts not held by DeviceManager and barrier reuqests issued by OFJ makes
OutsandingMessageExtractor useless as well.
Change-Id: Ib259020ed4e3cca56463696827bb24bf7bc97069
Signed-off-by: Martin Bobak <mbobak@cisco.com>
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MessageHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.OutstandingMessageExtractor;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
OpenFlowPluginTimer,
MessageHandler,
TranslatorLibrarian,
- OutstandingMessageExtractor,
DeviceReplyProcessor,
DeviceDisconnectedHandler {
*/
ConnectionContext getAuxiliaryConnectiobContexts(BigInteger cookie);
-
- /**
- * @param xid key
- * @return request by xid
- */
- RequestContext<?> lookupRequest(Xid xid);
-
- /**
- * @return number of outstanding requests in map
- */
- int getNumberOfOutstandingRequests();
-
- /**
- * Method writes request context into request context map. This method
- * is ment to be used by org.opendaylight.openflowplugin.impl.services.OFJResult2RequestCtxFuture#processResultFromOfJava.
- *
- * @param xid
- * @param requestFutureContext
- */
- void hookRequestCtx(Xid xid, RequestContext<?> requestFutureContext);
-
- /**
- * Method removes request context from request context map.
- *
- * @param xid
- */
- RequestContext<?> unhookRequestCtx(Xid xid);
-
/**
* Method exposes flow registry used for storing flow ids identified by calculated flow hash.
*
package org.opendaylight.openflowplugin.api.openflow.device.handlers;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import javax.annotation.Nonnull;
final int DEFAULT_TIME_OUT = 10;
/**
- * Method registers a transaction id xid to the Multipart messages collector
+ * Method registers a requst context to the Multipart messages collector
* and returns Settable future with all MultipartReply. Method has to be called before
* send a request to the device, otherwise there is a small possibility to miss a first msg.
*
- * @param xid
+ * @param requestContext
*/
- void registerMultipartXid(long xid);
+ void registerMultipartRequestContext(RequestContext requestContext);
/**
* Method adds a reply multipart message to the collection and if the message has marker
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.api.openflow.device.handlers;
-
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-
-/**
- * Created by mirehak on 4/3/15.
- */
-public interface OutstandingMessageExtractor {
-
- /**
- * @param barrierXid
- * @return next older message, delivered messages will be removed from original cache
- */
- RequestContext<?> extractNextOutstandingMessage(long barrierXid);
-}
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
-import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final HashedWheelTimer hashedWheelTimer;
- private final Map<Long, RequestContext<?>> requests = new TreeMap<>();
-
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private final TransactionChainManager txChainManager;
private TranslatorLibrary translatorLibrary;
return auxiliaryConnectionContexts.get(new SwitchConnectionCookieOFImpl(cookie.longValue()));
}
- @Override
- public RequestContext<?> lookupRequest(final Xid xid) {
- synchronized (requests) {
- return requests.get(xid.getValue());
- }
- }
-
- @Override
- public int getNumberOfOutstandingRequests() {
- synchronized (requests) {
- return requests.size();
- }
- }
-
- @Override
- public void hookRequestCtx(final Xid xid, final RequestContext<?> requestFutureContext) {
- synchronized (requests) {
- requests.put(xid.getValue(), requestFutureContext);
- }
- }
-
- @Override
- public RequestContext<?> unhookRequestCtx(final Xid xid) {
- synchronized (requests) {
- return requests.remove(xid.getValue());
- }
- }
@Override
public DeviceFlowRegistry getDeviceFlowRegistry() {
@Override
public void processReply(final OfHeader ofHeader) {
- final RequestContext requestContext = requests.remove(ofHeader.getXid());
- if (null != requestContext) {
- RpcResult<OfHeader> rpcResult;
- if (ofHeader instanceof Error) {
- //TODO : this is the point, where we can discover that add flow operation failed and where we should
- //TODO : remove this flow from deviceFlowRegistry
- final Error error = (Error) ofHeader;
- final String message = "Operation on device failed with xid " + ofHeader.getXid() + ".";
- rpcResult = RpcResultBuilder
- .<OfHeader>failed()
- .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
- .build();
- messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
- } else {
- rpcResult = RpcResultBuilder
- .<OfHeader>success()
- .withResult(ofHeader)
- .build();
- messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
- }
-
- requestContext.setResult(rpcResult);
- try {
- requestContext.close();
- } catch (final Exception e) {
- LOG.warn("Closing RequestContext failed: {}", e.getMessage());
- LOG.debug("Closing RequestContext failed.. ", e);
- }
+ RpcResult<OfHeader> rpcResult;
+ if (ofHeader instanceof Error) {
+ messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
} else {
- LOG.warn("Can't find request context registered for xid : {}. Type of reply: {}. From address: {}", ofHeader.getXid(), ofHeader.getClass().getName(),
- getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
+ messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
}
+
}
@Override
public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
- final RequestContext requestContext;
- synchronized (requests) {
- requestContext = requests.remove(xid.getValue());
- }
- if (null != requestContext) {
- final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
- .<List<MultipartReply>>success()
- .withResult(ofHeaderList)
- .build();
- requestContext.setResult(rpcResult);
- for (final MultipartReply multipartReply : ofHeaderList) {
- messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
- }
-
- unhookRequestCtx(xid);
- try {
- requestContext.close();
- } catch (final Exception e) {
- LOG.warn("Closing RequestContext failed: {}", e.getMessage());
- LOG.debug("Closing RequestContext failed.. ", e);
- }
- } else {
- LOG.warn("Can't find request context registered for xid : {}. Type of reply: MULTIPART. From address: {}", xid.getValue(),
- getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress());
+ for (final MultipartReply multipartReply : ofHeaderList) {
+ messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
}
}
@Override
public void processException(final Xid xid, final DeviceDataException deviceDataException) {
-
- LOG.trace("Processing exception for xid : {}", xid.getValue());
-
- final RequestContext requestContext = requests.remove(xid.getValue());
-
- if (null != requestContext) {
- final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
- .<List<OfHeader>>failed()
- .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
- .build();
- requestContext.setResult(rpcResult);
- messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
- try {
- requestContext.close();
- } catch (final Exception e) {
- LOG.warn("Closing RequestContext failed: ", e);
- LOG.debug("Closing RequestContext failed..", e);
- }
- } else {
- LOG.warn("Can't find request context registered for xid : {}. Exception message {}",
- xid.getValue(), deviceDataException.getMessage());
- }
+ messageSpy.spyMessage(deviceDataException.getClass(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
}
@Override
primaryConnectionContext.setConnectionState(ConnectionContext.CONNECTION_STATE.RIP);
primaryConnectionContext.getConnectionAdapter().disconnect();
}
- for (final Map.Entry<Long, RequestContext<?>> entry : requests.entrySet()) {
- RequestContextUtil.closeRequestContextWithRpcError(entry.getValue(), DEVICE_DISCONNECTED);
- }
for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) {
if (connectionContext.getConnectionAdapter().isAlive()) {
connectionContext.getConnectionAdapter().disconnect();
}
}
- @Override
- public RequestContext<?> extractNextOutstandingMessage(final long barrierXid) {
- RequestContext<?> nextMessage = null;
- synchronized (requests) {
- final Iterator<Long> keyIterator = requests.keySet().iterator();
- if (keyIterator.hasNext()) {
- final Long oldestXid = keyIterator.next();
- if (oldestXid < barrierXid) {
- nextMessage = requests.remove(oldestXid);
- }
- }
- }
- return nextMessage;
- }
-
@Override
public void setCurrentBarrierTimeout(final Timeout timeout) {
barrierTaskTimeout = timeout;
final Xid xid = requestContext.getXid();
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture = requestContext.getFuture();
final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector();
- multiMsgCollector.registerMultipartXid(xid.getValue());
+ multiMsgCollector.registerMultipartRequestContext(requestContext);
createSuccessProcessingCallback(type, deviceContext, nodeII, requestContextFuture);
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ * <p>
* openflowplugin-api
* org.opendaylight.openflowplugin.impl.openflow.device
- * <p>
+ *
* Implementation for {@link MultiMsgCollector} interface
*
* @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
* @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
- * <p>
+ * </p>
* Created: Mar 23, 2015
*/
@VisibleForTesting
}
@Override
- public void registerMultipartXid(final long xid) {
- cache.put(xid, new MultiCollectorObject());
+ public void registerMultipartRequestContext(final RequestContext requestContext) {
+ cache.put(requestContext.getXid().getValue(), new MultiCollectorObject(requestContext));
}
@Override
MultipartType multipartType = reply.getType();
LOG.trace("Orphaned multipart msg with XID : {} of type {}", xid, multipartType);
deviceReplyProcessor.processException(new Xid(xid),
- new DeviceDataException("unknown xid received for multipart of type "+multipartType));
+ new DeviceDataException("unknown xid received for multipart of type " + multipartType));
return;
}
private class MultiCollectorObject {
private final List<MultipartReply> replyCollection;
private MultipartType msgType;
+ private final RequestContext requestContext;
- MultiCollectorObject() {
+ MultiCollectorObject(final RequestContext requestContext) {
replyCollection = new ArrayList<>();
+ this.requestContext = requestContext;
}
void add(final MultipartReply reply) throws DeviceDataException {
}
void publishCollection(final long xid) {
+ final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder
+ .<List<MultipartReply>>success()
+ .withResult(replyCollection)
+ .build();
+ requestContext.setResult(rpcResult);
+ try {
+ requestContext.close();
+ } catch (final Exception e) {
+ LOG.warn("Closing RequestContext failed: {}", e.getMessage());
+ LOG.debug("Closing RequestContext failed.. ", e);
+ }
deviceReplyProcessor.processReply(new Xid(xid), replyCollection);
}
void invalidateFutureByTimeout(final long key) {
final String msg = "MultiMsgCollector can not wait for last multipart any more";
- deviceReplyProcessor.processException(new Xid(key), new DeviceDataException(msg));
+ DeviceDataException deviceDataException = new DeviceDataException(msg);
+ final RpcResult<List<OfHeader>> rpcResult = RpcResultBuilder
+ .<List<OfHeader>>failed()
+ .withError(RpcError.ErrorType.APPLICATION, String.format("Message processing failed : %s", deviceDataException.getError()), deviceDataException)
+ .build();
+ requestContext.setResult(rpcResult);
+ try {
+ requestContext.close();
+ } catch (final Exception e) {
+ LOG.warn("Closing RequestContext failed: ", e);
+ LOG.debug("Closing RequestContext failed..", e);
+ }
+ deviceReplyProcessor.processException(new Xid(key), deviceDataException);
+ }
+
+ public RequestContext getRequestContext() {
+ return requestContext;
}
private void msgTypeValidation(final MultipartType type, final long key) throws DeviceDataException {
}
final ListenableFuture<RpcResult<F>> resultFromOFLib;
- LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
-
messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
function.apply(requestContext);
barrierInputOFJavaBuilder.setXid(xid.getValue());
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
- deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider();
final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(barrierInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<Void>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(barrierInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(setConfigInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<SetConfigOutput>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<SetConfigOutput> rpcResultBuilder = RpcResultBuilder.<SetConfigOutput>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(setConfigInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
final Xid xid = requestContext.getXid();
- deviceContext.hookRequestCtx(xid , requestContext);
-
final EchoInputBuilder echoInputOFJavaBuilder = new EchoInputBuilder();
echoInputOFJavaBuilder.setVersion(getVersion());
echoInputOFJavaBuilder.setXid(requestContext.getXid().getValue());
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(echoInputOFJava.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<SendEchoOutput>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<SendEchoOutput> rpcResultBuilder = RpcResultBuilder.<SendEchoOutput>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(echoInputOFJava.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<Void>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
settableFuture.set(rpcResultBuilder.build());
}
});
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(groupModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<Void>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(groupModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(meterModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<Void>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(meterModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
@Override
public void onSuccess(final OfHeader ofHeader) {
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(portModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
settableFuture.set(RpcResultBuilder.<Void>success().build());
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
getMessageSpy().spyMessage(portModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
// Set request body to main multipart request
final Xid xid = requestContext.getXid();
- getDeviceContext().getMultiMsgCollector().registerMultipartXid(xid.getValue());
final MultipartRequestInputBuilder mprInput = createMultipartHeader(MultipartType.OFPMPTABLEFEATURES,
xid.getValue());
mprInput.setMultipartRequestBody(caseBuilder.build());
final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
final MultiMsgCollector multiMsgCollector = getDeviceContext().getMultiMsgCollector();
- multiMsgCollector.registerMultipartXid(xid.getValue());
+ multiMsgCollector.registerMultipartRequestContext(requestContext);
final MultipartRequestInput multipartRequestInput = mprInput.build();
outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage(), throwable);
RequestContextUtil.closeRequstContext(requestContext);
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
+ multiMsgCollector.registerMultipartRequestContext(requestContext);
getMessageSpy().spyMessage(multipartRequestInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
settableFuture.set(rpcResultBuilder.build());
}
public ListenableFuture<RpcResult<Void>> apply(final RequestContext<List<MultipartReply>> requestContext) {
final Xid xid = requestContext.getXid();
final DeviceContext deviceContext = getDeviceContext();
- deviceContext.getMultiMsgCollector().registerMultipartXid(xid.getValue());
+ deviceContext.getMultiMsgCollector().registerMultipartRequestContext(requestContext);
final MultipartRequestAggregateCaseBuilder multipartRequestAggregateCaseBuilder = new MultipartRequestAggregateCaseBuilder();
final MultipartRequestAggregateBuilder mprAggregateRequestBuilder = new MultipartRequestAggregateBuilder();
final short tableId = MoreObjects.firstNonNull(input.getTableId(), OFConstants.OFPTT_ALL).shortValue();
final DeviceContext deviceContext = getDeviceContext();
final MultiMsgCollector multiMsgCollector = deviceContext.getMultiMsgCollector();
- multiMsgCollector.registerMultipartXid(xid.getValue());
+ multiMsgCollector.registerMultipartRequestContext(requestContext);
MultipartRequestInput multipartRequestInput = MultipartRequestInputFactory.
makeMultipartRequestInput(xid.getValue(),
getVersion(),
@Override
public void onFailure(final Throwable throwable) {
RpcResultBuilder<Void> rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
- getDeviceContext().unhookRequestCtx(requestContext.getXid());
RequestContextUtil.closeRequstContext(requestContext);
settableFuture.set(rpcResultBuilder.build());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+
import com.google.common.util.concurrent.SettableFuture;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
OutboundQueueProvider outboundQueueProvider;
private final AtomicLong atomicLong = new AtomicLong(0);
+
@Before
public void setUp() {
Mockito.when(dataBroker.createTransactionChain(Mockito.any(TransactionChainManager.class))).thenReturn(txChainFactory);
Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
Mockito.when(connectionContext.getOutboundQueueProvider()).thenReturn(outboundQueueProvider);
- deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder);
+ deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker, timer, messageIntelligenceAgency, throttledConnectionsHolder);
xid = new Xid(atomicLong.incrementAndGet());
xidMulti = new Xid(atomicLong.incrementAndGet());
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullConnectionContext() {
- new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder).close();
+ new DeviceContextImpl(null, deviceState, dataBroker, timer, messageIntelligenceAgency, throttledConnectionsHolder).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDataBroker() {
- new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency,throttledConnectionsHolder).close();
+ new DeviceContextImpl(connectionContext, deviceState, null, timer, messageIntelligenceAgency, throttledConnectionsHolder).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullDeviceState() {
- new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency,throttledConnectionsHolder).close();
+ new DeviceContextImpl(connectionContext, null, dataBroker, timer, messageIntelligenceAgency, throttledConnectionsHolder).close();
}
@Test(expected = NullPointerException.class)
public void testDeviceContextImplConstructorNullTimer() {
- new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency,throttledConnectionsHolder).close();
+ new DeviceContextImpl(null, deviceState, dataBroker, null, messageIntelligenceAgency, throttledConnectionsHolder).close();
}
@Test
return asyncOutputBuilder.build();
}
- @Test
- public void testProcessReply() {
- final GetAsyncOutput asyncOutput = createAsyncOutput(xid);
- LOG.info("Hooking RequestContext");
- deviceContext.hookRequestCtx(xid, requestContext);
- Assert.assertEquals(requestContext, deviceContext.lookupRequest(xid));
-
- Assert.assertFalse(requestContext.getFuture().isDone());
- LOG.info("Sending reply from device");
- deviceContext.processReply(asyncOutput);
- Assert.assertTrue(requestContext.getFuture().isDone());
-
- LOG.info("Checking RequestContext.future");
- try {
- final Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
- final RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
- final GetAsyncOutput getAsyncOutput = (GetAsyncOutput) rpcResult.getResult();
- assertEquals(asyncOutput.getVersion(), getAsyncOutput.getVersion());
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Test failed when checking RequestContext.future", e);
- fail("fail");
- }
- Assert.assertTrue(deviceContext.getNumberOfOutstandingRequests() == 0);
- }
private static Error createError(final Xid xid) {
final ErrorMessageBuilder errorMessageBuilder = new ErrorMessageBuilder();
return errorMessageBuilder.build();
}
- @Test
- public void testProcessReplyError() {
- LOG.info("Hooking RequestContext");
- deviceContext.hookRequestCtx(xid, requestContext);
- Assert.assertEquals(requestContext, deviceContext.lookupRequest(xid));
-
- Assert.assertFalse(requestContext.getFuture().isDone());
- LOG.info("Sending error reply from device");
- final Error error = createError(xid);
- deviceContext.processReply(error);
- Assert.assertTrue(requestContext.getFuture().isDone());
-
- LOG.info("Checking RequestContext.future");
- try {
- final Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
- final RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
- Assert.assertFalse(rpcResult.isSuccessful());
- final List<RpcError> errors = (List<RpcError>) rpcResult.getErrors();
- Assert.assertTrue(errors.get(0).getCause() instanceof DeviceDataException);
- final DeviceDataException cause = (DeviceDataException) errors.get(0).getCause();
- Assert.assertEquals(error, cause.getError());
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Test failed when checking RequestContext.future", e);
- fail("fail");
- }
- Assert.assertTrue(deviceContext.getNumberOfOutstandingRequests() == 0);
- }
-
- @Test
- public void testProcessReplyList() {
- LOG.info("Hooking RequestContext");
- deviceContext.hookRequestCtx(xidMulti, requestContextMultiReply);
- Assert.assertEquals(requestContextMultiReply, deviceContext.lookupRequest(xidMulti));
-
- Assert.assertFalse(requestContextMultiReply.getFuture().isDone());
- LOG.info("Sending reply from device");
- deviceContext.processReply(xidMulti, createMultipartReplyList(xidMulti));
- Assert.assertTrue(requestContextMultiReply.getFuture().isDone());
-
- LOG.info("Checking RequestContext.future");
- try {
- final Object object = requestContextMultiReply.getFuture().get(1L, TimeUnit.SECONDS);
- final RpcResult<List<OfHeader>> rpcResult = (RpcResult<List<OfHeader>>) object;
- final List<OfHeader> multipartReplies = rpcResult.getResult();
- final List<MultipartReply> expectedMpReplies = createMultipartReplyList(xidMulti);
- assertEquals(expectedMpReplies, multipartReplies);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Test failed when checking RequestContext.future", e);
- fail("fail");
- }
- Assert.assertTrue(deviceContext.getNumberOfOutstandingRequests() == 0);
- }
-
private static List<MultipartReply> createMultipartReplyList(final Xid xid) {
final MultipartReplyDesc descValue = new MultipartReplyDescBuilder().setHwDesc("hw-test-value").build();
final MultipartReplyDescCase replyBody = new MultipartReplyDescCaseBuilder()
return multipartReplies;
}
- @Test
- public void testProcessException() {
- LOG.info("Hooking RequestContext");
- deviceContext.hookRequestCtx(xid, requestContext);
- Assert.assertEquals(requestContext, deviceContext.lookupRequest(xid));
-
- Assert.assertFalse(requestContext.getFuture().isDone());
-
- LOG.info("Sending reply from device");
- deviceContext.processException(xid, new DeviceDataException("Some freakin' error", new NullPointerException()));
- Assert.assertTrue(requestContext.getFuture().isDone());
-
- LOG.info("Checking RequestContext.future");
- try {
- final Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
- final RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
- Assert.assertFalse(rpcResult.isSuccessful());
- final List<RpcError> errors = (List<RpcError>) rpcResult.getErrors();
- Assert.assertTrue(errors.get(0).getCause() instanceof DeviceDataException);
- final DeviceDataException cause = (DeviceDataException) errors.get(0).getCause();
- Assert.assertTrue(cause.getCause() instanceof NullPointerException);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Test failed when checking RequestContext.future", e);
- fail("fail");
- }
- Assert.assertTrue(deviceContext.getNumberOfOutstandingRequests() == 0);
- }
}
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceReplyProcessor;
/**
* openflowplugin-api
* org.opendaylight.openflowplugin.impl.openflow.device
- *
+ * <p/>
* Test class for testing basic method functionality for {@link MultiMsgCollector}
*
* @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
* @author <a href="mailto:tkubas@cisco.com">Timotej Kubas</a>
- *
- * Created: Mar 23, 2015
+ * <p/>
+ * Created: Mar 23, 2015
*/
@RunWith(MockitoJUnitRunner.class)
public class MultiMsgCollectorImplTest {
ArgumentCaptor<Xid> xidCaptor;
@Captor
ArgumentCaptor<List<MultipartReply>> mmCaptor;
+ @Mock
+ RequestContext requestContext;
+ final Long xid = 1L;
+
private final String hwTestValue = "test-value";
private final String expectedExpirationMsg = "MultiMsgCollector can not wait for last multipart any more";
collector = new MultiMsgCollectorImpl(1);
collector.setDeviceReplyProcessor(deviceProcessor);
cleanUpCheck = Runnables.doNothing();
+ Mockito.when(requestContext.getXid()).thenReturn(new Xid(xid));
}
@After
Thread.sleep(1100L);
// flush cache action
- collector.registerMultipartXid(0L);
+ collector.registerMultipartRequestContext(requestContext);
cleanUpCheck.run();
Mockito.verifyNoMoreInteractions(deviceProcessor);
}
/**
* test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * success with message consisting of 1 part
+ * success with message consisting of 1 part
*/
@Test
public void testAddMultipartMsgOne() {
final Long xid = 1L;
- collector.registerMultipartXid(xid);
+ collector.registerMultipartRequestContext(requestContext);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false).build());
Mockito.verify(deviceProcessor).processReply(xidCaptor.capture(), mmCaptor.capture());
}
/**
- * test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * success with message consisting of 2 parts
+ * test of ${link MultiMsgCollector#addMultipartMsg} <br>
+ * success with message consisting of 2 parts
*/
@Test
public void testAddMultipartMsgTwo() {
final Long xid = 1L;
- collector.registerMultipartXid(xid);
+ collector.registerMultipartRequestContext(requestContext);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false).build());
/**
* test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * xid not registered before message
+ * xid not registered before message
*/
@Test
public void testAddMultipartMsgNotExpectedXid() {
/**
* test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * message types are inconsistent - second message is final and should be rejected
+ * message types are inconsistent - second message is final and should be rejected
*/
@Test
public void testAddMultipartMsgWrongType1() {
- final Long xid = 1L;
- collector.registerMultipartXid(xid);
+ collector.registerMultipartRequestContext(requestContext);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, false)
.setType(MultipartType.OFPMPPORTDESC).build());
/**
* test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * message types are inconsistent - second message is not final and should be rejected
+ * message types are inconsistent - second message is not final and should be rejected
*/
@Test
public void testAddMultipartMsgWrongType2() {
final Long xid = 1L;
- collector.registerMultipartXid(xid);
+ collector.registerMultipartRequestContext(requestContext);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true)
.setType(MultipartType.OFPMPPORTDESC).build());
/**
* test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * message types are inconsistent - second message and third should be rejected
+ * message types are inconsistent - second message and third should be rejected
*/
@Test
public void testAddMultipartMsgWrongType3() {
final Long xid = 1L;
- collector.registerMultipartXid(xid);
+ collector.registerMultipartRequestContext(requestContext);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true)
.setType(MultipartType.OFPMPPORTDESC).build());
/**
* test of ${link MultiMsgCollector#addMultipartMsg} <br>
- * no second message arrived within expiration limit - first message should expire
+ * no second message arrived within expiration limit - first message should expire
*/
@Test
public void testAddMultipartMsgExpiration() throws InterruptedException {
final Long xid = 1L;
- collector.registerMultipartXid(xid);
+ collector.registerMultipartRequestContext(requestContext);
collector.addMultipartMsg(MsgGeneratorTestUtils.makeMultipartDescReply(xid, hwTestValue, true).build());
cleanUpCheck = new Runnable() {