import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestFutureContext;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.device.XidGenerator;
+import org.opendaylight.openflowplugin.api.openflow.device.*;
+import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures;
import org.opendaylight.yangtools.yang.binding.ChildOf;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import javax.annotation.Nonnull;
import java.math.BigInteger;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
/**
*
*/
-public class DeviceContextImpl implements DeviceContext, TransactionChainListener {
+public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final XidGenerator xidGenerator;
+ private Map<Long, RequestContext> requests =
+ new HashMap<Long, RequestContext>();
- private final Map<Xid, RequestFutureContext> requests;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private BindingTransactionChain txChainFactory;
return xidGenerator.generate();
}
- @Override
- public Map<Xid, RequestFutureContext> getRequests() {
+ public Map<Long, RequestContext> getRequests() {
return requests;
}
@Override
- public void hookRequestCtx(final Xid xid, final RequestFutureContext requestFutureContext) {
- requests.put(xid, requestFutureContext);
+ public void hookRequestCtx(Xid xid, RequestContext requestFutureContext) {
+ // TODO Auto-generated method stub
+ requests.put(xid.getValue(), requestFutureContext);
+ }
+
+ @Override
+ public void processReply(OfHeader ofHeader) {
+ RequestContext requestContext = getRequests().get(ofHeader.getXid());
+ SettableFuture replyFuture = requestContext.getFuture();
+ getRequests().remove(ofHeader.getXid());
+ RpcResult<OfHeader> rpcResult;
+
+ if(ofHeader instanceof Error) {
+ Error error = (Error) ofHeader;
+ String message = "Operation on device failed";
+ rpcResult= RpcResultBuilder
+ .<OfHeader>failed()
+ .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
+ .build();
+ } else {
+ rpcResult= RpcResultBuilder
+ .<OfHeader>success()
+ .withResult(ofHeader)
+ .build();
+ }
+
+ replyFuture.set(rpcResult);
+ try {
+ requestContext.close();
+ } catch (Exception e) {
+ LOG.error("Closing RequestContext failed: ", e);
+ }
}
@Override
- public void processReply(final Xid xid, final OfHeader ofHeader) {
- final SettableFuture replyFuture = getRequests().get(xid).getFuture();
- replyFuture.set(ofHeader);
+ public void processReply(Xid xid, List<OfHeader> ofHeaderList) {
+ RequestContext requestContext = getRequests().get(xid.getValue());
+ SettableFuture replyFuture = requestContext.getFuture();
+ getRequests().remove(xid.getValue());
+ RpcResult<List<OfHeader>> rpcResult= RpcResultBuilder
+ .<List<OfHeader>>success()
+ .withResult(ofHeaderList)
+ .build();
+ replyFuture.set(rpcResult);
+ try {
+ requestContext.close();
+ } catch (Exception e) {
+ LOG.error("Closing RequestContext failed: ", e);
+ }
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
- final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ public void processException(Xid xid, DeviceDataException deviceDataException) {
+ RequestContext requestContext = getRequests().get(xid.getValue());
+
+ SettableFuture replyFuture = requestContext.getFuture();
+ getRequests().remove(xid.getValue());
+ RpcResult<List<OfHeader>> rpcResult= RpcResultBuilder
+ .<List<OfHeader>>failed()
+ .withError(RpcError.ErrorType.APPLICATION, "Message processing failed", deviceDataException)
+ .build();
+ replyFuture.set(rpcResult);
+ try {
+ requestContext.close();
+ } catch (Exception e) {
+ LOG.error("Closing RequestContext failed: ", e);
+ }
+ }
+
+ @Override
+ public void onTransactionChainFailed(TransactionChain<?, ?> chain,
+ AsyncTransaction<?, ?> transaction, Throwable cause) {
txChainFactory.close();
txChainFactory = dataBroker.createTransactionChain(DeviceContextImpl.this);
+
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
- // NOOP - only yet, here is probably place for notification to get new WriteTransaction
+ public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+ // NOOP - only yet, here is probably place for notification to get new WriteTransaction
+
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.XidGenerator;
+import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessageBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.async.body.grouping.FlowRemovedMask;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.async.body.grouping.PacketInMask;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.async.body.grouping.PortStatusMask;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDescBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.SettableFuture;
+
@RunWith(MockitoJUnitRunner.class)
public class DeviceContextImplTest {
private static final Logger LOG = LoggerFactory
.getLogger(DeviceContextImplTest.class);
XidGenerator xidGen;
+
+ Xid xid;
+ Xid xidMulti;
DeviceContextImpl deviceContext;
@Mock
RequestContext<GetAsyncReply> requestContext;
+ @Mock
+ RequestContext<MultipartReply> requestContextMultiReply;
+
@Mock
ConnectionContext connectionContext;
@Mock
@Before
public void setUp() {
+ SettableFuture<RpcResult<GetAsyncReply>> settableFuture = SettableFuture.create();
+ SettableFuture<RpcResult<MultipartReply>> settableFutureMultiReply = SettableFuture.create();
+ Mockito.when(requestContext.getFuture()).thenReturn(settableFuture);
+ Mockito.when(requestContextMultiReply.getFuture()).thenReturn(settableFutureMultiReply);
Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
Mockito.when(dataBroker.createTransactionChain(Mockito.any(DeviceContextImpl.class))).thenReturn(txChainFactory);
Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
+
deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker);
+ xid = deviceContext.getNextXid();
+ xidMulti = deviceContext.getNextXid();
xidGen = new XidGenerator();
- final SettableFuture<RpcResult<GetAsyncReply>> settableFuture = SettableFuture.create();
- Mockito.when(requestContext.getFuture()).thenReturn(settableFuture);
- deviceContext.hookRequestCtx(deviceContext.getNextXid(), requestContext);
}
@Test(expected=NullPointerException.class)
Assert.assertEquals(wTx, writeTx);
}
+ private static GetAsyncOutput createAsyncOutput(Xid xid) {
+ GetAsyncOutputBuilder asyncOutputBuilder = new GetAsyncOutputBuilder();
+ asyncOutputBuilder.setFlowRemovedMask(Collections.<FlowRemovedMask> emptyList());
+ asyncOutputBuilder.setPacketInMask(Collections.<PacketInMask> emptyList());
+ asyncOutputBuilder.setPortStatusMask(Collections.<PortStatusMask> emptyList());
+ asyncOutputBuilder.setVersion(OFConstants.OFP_VERSION_1_3);
+ asyncOutputBuilder.setXid(xid.getValue());
+ return asyncOutputBuilder.build();
+ }
+
@Test
public void testProcessReply() {
- final Xid xid = xidGen.generate();
- final GetAsyncOutput asyncOutput = createAsyncOutput(xid);
+ GetAsyncOutput asyncOutput = createAsyncOutput(xid);
LOG.info("Hooking RequestContext");
deviceContext.hookRequestCtx(xid, requestContext);
- Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid));
+ Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid.getValue()));
Assert.assertFalse(requestContext.getFuture().isDone());
LOG.info("Sending reply from device");
- deviceContext.processReply(xid, asyncOutput);
+ deviceContext.processReply(asyncOutput);
Assert.assertTrue(requestContext.getFuture().isDone());
LOG.info("Checking RequestContext.future");
try {
- final Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
- final GetAsyncOutput getAsyncOutput = (GetAsyncOutput) object;
+ Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
+ RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
+ GetAsyncOutput getAsyncOutput = (GetAsyncOutput) rpcResult.getResult();
assertEquals(asyncOutput.getVersion(), getAsyncOutput.getVersion());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Test failed when checking RequestContext.future", e);
fail("fail");
}
+ Assert.assertTrue(deviceContext.getRequests().isEmpty());
}
- private GetAsyncOutput createAsyncOutput(final Xid xid) {
- final GetAsyncOutputBuilder asyncOutputBuilder = new GetAsyncOutputBuilder();
- asyncOutputBuilder.setFlowRemovedMask(Collections.<FlowRemovedMask> emptyList());
- asyncOutputBuilder.setPacketInMask(Collections.<PacketInMask> emptyList());
- asyncOutputBuilder.setPortStatusMask(Collections.<PortStatusMask> emptyList());
- asyncOutputBuilder.setVersion(OFConstants.OFP_VERSION_1_3);
- asyncOutputBuilder.setXid(xid.getValue());
- return asyncOutputBuilder.build();
+ private static Error createError(Xid xid) {
+ ErrorMessageBuilder errorMessageBuilder = new ErrorMessageBuilder();
+ errorMessageBuilder.setCode(42);
+ errorMessageBuilder.setCodeString("42");
+ errorMessageBuilder.setXid(xid.getValue());
+ return errorMessageBuilder.build();
+ }
+
+ @Test
+ public void testProcessReplyError() {
+ LOG.info("Hooking RequestContext");
+ deviceContext.hookRequestCtx(xid, requestContext);
+ Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid.getValue()));
+
+ Assert.assertFalse(requestContext.getFuture().isDone());
+ LOG.info("Sending error reply from device");
+ Error error = createError(xid);
+ deviceContext.processReply(error);
+ Assert.assertTrue(requestContext.getFuture().isDone());
+
+ LOG.info("Checking RequestContext.future");
+ try {
+ Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
+ RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
+ Assert.assertFalse(rpcResult.isSuccessful());
+ List<RpcError> errors = (List<RpcError>) rpcResult.getErrors();
+ Assert.assertTrue(errors.get(0).getCause() instanceof DeviceDataException);
+ DeviceDataException cause = (DeviceDataException) errors.get(0).getCause();
+ Assert.assertEquals(error, cause.getError());
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("Test failed when checking RequestContext.future", e);
+ fail("fail");
+ }
+ Assert.assertTrue(deviceContext.getRequests().isEmpty());
+ }
+
+ @Test
+ public void testProcessReplyList() {
+ LOG.info("Hooking RequestContext");
+ deviceContext.hookRequestCtx(xidMulti, requestContextMultiReply);
+ Assert.assertEquals(requestContextMultiReply, deviceContext.getRequests().get(xidMulti.getValue()));
+
+ Assert.assertFalse(requestContextMultiReply.getFuture().isDone());
+ LOG.info("Sending reply from device");
+ deviceContext.processReply(xidMulti, createMultipartReplyList(xidMulti));
+ Assert.assertTrue(requestContextMultiReply.getFuture().isDone());
+
+ LOG.info("Checking RequestContext.future");
+ try {
+ Object object = requestContextMultiReply.getFuture().get(1L, TimeUnit.SECONDS);
+ RpcResult<List<OfHeader>> rpcResult = (RpcResult<List<OfHeader>>) object;
+ List<OfHeader> multipartReplies = rpcResult.getResult();
+ List<OfHeader> expectedMpReplies = createMultipartReplyList(xidMulti);
+ assertEquals(expectedMpReplies, multipartReplies);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("Test failed when checking RequestContext.future", e);
+ fail("fail");
+ }
+ Assert.assertTrue(deviceContext.getRequests().isEmpty());
+ }
+
+ private static List<OfHeader> createMultipartReplyList(Xid xid) {
+ final MultipartReplyDesc descValue = new MultipartReplyDescBuilder().setHwDesc("hw-test-value").build();
+ final MultipartReplyDescCase replyBody = new MultipartReplyDescCaseBuilder()
+ .setMultipartReplyDesc(descValue).build();
+ List<OfHeader> multipartReplies = new ArrayList<OfHeader>();
+ multipartReplies.add(new MultipartReplyMessageBuilder()
+ .setMultipartReplyBody(replyBody)
+ .setXid(xid.getValue())
+ .setFlags(new MultipartRequestFlags(false))
+ .build());
+ multipartReplies.add(new MultipartReplyMessageBuilder()
+ .setMultipartReplyBody(replyBody)
+ .setXid(xid.getValue())
+ .setFlags(new MultipartRequestFlags(true))
+ .build());
+ return multipartReplies;
+ }
+
+ @Test
+ public void testProcessException() {
+ LOG.info("Hooking RequestContext");
+ deviceContext.hookRequestCtx(xid, requestContext);
+ Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid.getValue()));
+
+ Assert.assertFalse(requestContext.getFuture().isDone());
+
+ LOG.info("Sending reply from device");
+ deviceContext.processException(xid, new DeviceDataException("Some freakin' error", new NullPointerException()));
+ Assert.assertTrue(requestContext.getFuture().isDone());
+
+ LOG.info("Checking RequestContext.future");
+ try {
+ Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
+ RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
+ Assert.assertFalse(rpcResult.isSuccessful());
+ List<RpcError> errors = (List<RpcError>) rpcResult.getErrors();
+ Assert.assertTrue(errors.get(0).getCause() instanceof DeviceDataException);
+ DeviceDataException cause = (DeviceDataException) errors.get(0).getCause();
+ Assert.assertTrue(cause.getCause() instanceof NullPointerException);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("Test failed when checking RequestContext.future", e);
+ fail("fail");
+ }
+ Assert.assertTrue(deviceContext.getRequests().isEmpty());
}
}