Changed methods in DeviceContext, created DeviceReplyProcessor interface 28/17328/3
authorTimotej Kubas <tkubas@cisco.com>
Fri, 27 Mar 2015 09:26:53 +0000 (10:26 +0100)
committerTimotej Kubas <tkubas@cisco.com>
Wed, 1 Apr 2015 16:50:03 +0000 (18:50 +0200)
- changed methods in DeviceContext
- .processReply methods excluded into DeviceReplyProcessor interface
- both interfaces implemented in DeviceContextImpl
- added tests

Change-Id: I7a07d94c6c018feef562f5c7f4274871b2a28c69
Signed-off-by: Timotej Kubas <tkubas@cisco.com>
model/model-flow-service/src/main/yang/node-errors.yang
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceReplyProcessor.java [new file with mode: 0644]
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/exception/DeviceDataException.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImplTest.java

index 264c2b82529935d40278838936e8ec84c49de696..ddced8dc8819d51758425a2caa3078b7c0cfb864 100644 (file)
@@ -183,5 +183,9 @@ module node-error {
         uses flow:base-node-error-notification;
         uses flow:node-error-reference;
     }
+    
+    container node-error-container {
+        uses error:error-message;
+    }
 }
 
index 704b1e6e19d53210755538303811ade2fbe7a7f5..91506239dd13ca926115339f3bf9f3882ed69f78 100644 (file)
@@ -14,12 +14,20 @@ import java.util.concurrent.Future;
 import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.MessageHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
 /**
  * The central entity of OFP is the Device Context, which encapsulate the logical state of a switch
  * as seen by the controller. Each OpenFlow session is tracked by a Connection Context.
@@ -99,25 +107,12 @@ public interface DeviceContext extends MessageHandler {
 
     Xid getNextXid();
 
-    /**
-     * Method provides requests map
-     * @return
-     */
-    public Map<Xid, RequestFutureContext> getRequests();
-
     /**
      * Method writes request context into request context map
      * @param xid
      * @param requestFutureContext
      */
-    public void hookRequestCtx(Xid xid, RequestFutureContext requestFutureContext);
-
-    /**
-     * Method that set future to context in Map
-     * @param xid
-     * @param ofHeader
-     */
-    public void processReply(Xid xid, OfHeader ofHeader);
+    public void hookRequestCtx(Xid xid, RequestContext requestFutureContext);
 
 }
 
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceReplyProcessor.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/DeviceReplyProcessor.java
new file mode 100644 (file)
index 0000000..6f83bb4
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.device;
+
+import java.util.List;
+
+import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+/**
+ *
+ * @author tkubas
+ *
+ */
+public interface DeviceReplyProcessor {
+
+    /**
+     * Method that set future to context in Map
+     * @param ofHeader
+     */
+    public void processReply(OfHeader ofHeader);
+
+    /**
+     * Method that set future to context in Map
+     * @param xid,
+     * @param ofHeaderList
+     */
+    public void processReply(Xid xid, List<OfHeader> ofHeaderList);
+
+    /**
+     * Method that set exception to the future
+     * @param xid,
+     * @param deviceDataException
+     */
+    public void processException(Xid xid, DeviceDataException deviceDataException);
+
+}
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/exception/DeviceDataException.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/device/exception/DeviceDataException.java
new file mode 100644 (file)
index 0000000..f7283d2
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.device.exception;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
+
+public class DeviceDataException extends Exception {
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+    private Error error;
+
+    public DeviceDataException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DeviceDataException(String message, Error error) {
+        super(message);
+        this.error= error;
+    }
+
+    public Error getError() {
+        return error;
+    }
+}
index b3b12af5de754e3422d06508ce2374a79e771eb0..d4595e06f8bee6a17fc05756e1ee90edf7016e1b 100644 (file)
@@ -18,29 +18,32 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestFutureContext;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.device.XidGenerator;
+import org.opendaylight.openflowplugin.api.openflow.device.*;
+import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.TableFeatures;
 import org.opendaylight.yangtools.yang.binding.ChildOf;
 import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 import java.math.BigInteger;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 /**
  *
  */
-public class DeviceContextImpl implements DeviceContext, TransactionChainListener {
+public class DeviceContextImpl implements DeviceContext, DeviceReplyProcessor, TransactionChainListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
 
@@ -48,8 +51,9 @@ public class DeviceContextImpl implements DeviceContext, TransactionChainListene
     private final DeviceState deviceState;
     private final DataBroker dataBroker;
     private final XidGenerator xidGenerator;
+    private Map<Long, RequestContext> requests =
+            new HashMap<Long, RequestContext>();
 
-    private final Map<Xid, RequestFutureContext> requests;
     private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
     private BindingTransactionChain txChainFactory;
 
@@ -121,32 +125,92 @@ public class DeviceContextImpl implements DeviceContext, TransactionChainListene
         return xidGenerator.generate();
     }
 
-    @Override
-    public Map<Xid, RequestFutureContext> getRequests() {
+    public Map<Long, RequestContext> getRequests() {
         return requests;
     }
 
     @Override
-    public void hookRequestCtx(final Xid xid, final RequestFutureContext requestFutureContext) {
-        requests.put(xid, requestFutureContext);
+    public void hookRequestCtx(Xid xid, RequestContext requestFutureContext) {
+        // TODO Auto-generated method stub
+        requests.put(xid.getValue(), requestFutureContext);
+    }
+
+    @Override
+    public void processReply(OfHeader ofHeader) {
+        RequestContext requestContext = getRequests().get(ofHeader.getXid());
+        SettableFuture replyFuture = requestContext.getFuture();
+        getRequests().remove(ofHeader.getXid());
+        RpcResult<OfHeader> rpcResult;
+
+        if(ofHeader instanceof Error) {
+            Error error = (Error) ofHeader;
+            String message = "Operation on device failed";
+            rpcResult= RpcResultBuilder
+                    .<OfHeader>failed()
+                    .withError(RpcError.ErrorType.APPLICATION, message, new DeviceDataException(message, error))
+                    .build();
+        } else {
+            rpcResult= RpcResultBuilder
+                    .<OfHeader>success()
+                    .withResult(ofHeader)
+                    .build();
+        }
+
+        replyFuture.set(rpcResult);
+        try {
+            requestContext.close();
+        } catch (Exception e) {
+            LOG.error("Closing RequestContext failed: ", e);
+        }
     }
 
     @Override
-    public void processReply(final Xid xid, final OfHeader ofHeader) {
-        final SettableFuture replyFuture = getRequests().get(xid).getFuture();
-        replyFuture.set(ofHeader);
+    public void processReply(Xid xid, List<OfHeader> ofHeaderList) {
+        RequestContext requestContext = getRequests().get(xid.getValue());
+        SettableFuture replyFuture = requestContext.getFuture();
+        getRequests().remove(xid.getValue());
+        RpcResult<List<OfHeader>> rpcResult= RpcResultBuilder
+                                                .<List<OfHeader>>success()
+                                                .withResult(ofHeaderList)
+                                                .build();
+        replyFuture.set(rpcResult);
+        try {
+            requestContext.close();
+        } catch (Exception e) {
+            LOG.error("Closing RequestContext failed: ", e);
+        }
     }
 
     @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
-                                         final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+    public void processException(Xid xid, DeviceDataException deviceDataException) {
+        RequestContext requestContext = getRequests().get(xid.getValue());
+
+        SettableFuture replyFuture = requestContext.getFuture();
+        getRequests().remove(xid.getValue());
+        RpcResult<List<OfHeader>> rpcResult= RpcResultBuilder
+                .<List<OfHeader>>failed()
+                .withError(RpcError.ErrorType.APPLICATION, "Message processing failed", deviceDataException)
+                .build();
+        replyFuture.set(rpcResult);
+        try {
+            requestContext.close();
+        } catch (Exception e) {
+            LOG.error("Closing RequestContext failed: ", e);
+        }
+    }
+
+    @Override
+    public void onTransactionChainFailed(TransactionChain<?, ?> chain,
+            AsyncTransaction<?, ?> transaction, Throwable cause) {
         txChainFactory.close();
         txChainFactory = dataBroker.createTransactionChain(DeviceContextImpl.this);
+
     }
 
     @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-        // NOOP - only yet, here is probably place for notification to get new WriteTransaction
+    public void onTransactionChainSuccessful(TransactionChain<?, ?> chain) {
+     // NOOP - only yet, here is probably place for notification to get new WriteTransaction
+
     }
 
 }
index 841b7be83dbbc5ae903c17dbb87db8a3d28a92bb..ba8d84db1668598d0410a0184ab0347fe827feaa 100644 (file)
@@ -2,11 +2,14 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,24 +28,44 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.device.XidGenerator;
+import org.opendaylight.openflowplugin.api.openflow.device.exception.DeviceDataException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartRequestFlags;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.async.body.grouping.FlowRemovedMask;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.async.body.grouping.PacketInMask;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.async.body.grouping.PortStatusMask;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCaseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDesc;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.multipart.reply.desc._case.MultipartReplyDescBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.SettableFuture;
+
 @RunWith(MockitoJUnitRunner.class)
 public class DeviceContextImplTest {
     private static final Logger LOG = LoggerFactory
             .getLogger(DeviceContextImplTest.class);
     XidGenerator xidGen;
+
+    Xid xid;
+    Xid xidMulti;
     DeviceContextImpl deviceContext;
     @Mock
     RequestContext<GetAsyncReply> requestContext;
+    @Mock
+    RequestContext<MultipartReply> requestContextMultiReply;
+
     @Mock
     ConnectionContext connectionContext;
     @Mock
@@ -58,14 +81,18 @@ public class DeviceContextImplTest {
 
     @Before
     public void setUp() {
+        SettableFuture<RpcResult<GetAsyncReply>> settableFuture = SettableFuture.create();
+        SettableFuture<RpcResult<MultipartReply>> settableFutureMultiReply = SettableFuture.create();
+        Mockito.when(requestContext.getFuture()).thenReturn(settableFuture);
+        Mockito.when(requestContextMultiReply.getFuture()).thenReturn(settableFutureMultiReply);
         Mockito.when(txChainFactory.newWriteOnlyTransaction()).thenReturn(wTx);
         Mockito.when(dataBroker.createTransactionChain(Mockito.any(DeviceContextImpl.class))).thenReturn(txChainFactory);
         Mockito.when(dataBroker.newReadOnlyTransaction()).thenReturn(rTx);
+
         deviceContext = new DeviceContextImpl(connectionContext, deviceState, dataBroker);
+        xid = deviceContext.getNextXid();
+        xidMulti = deviceContext.getNextXid();
         xidGen = new XidGenerator();
-        final SettableFuture<RpcResult<GetAsyncReply>> settableFuture = SettableFuture.create();
-        Mockito.when(requestContext.getFuture()).thenReturn(settableFuture);
-        deviceContext.hookRequestCtx(deviceContext.getNextXid(), requestContext);
     }
 
     @Test(expected=NullPointerException.class)
@@ -104,38 +131,146 @@ public class DeviceContextImplTest {
         Assert.assertEquals(wTx, writeTx);
     }
 
+    private static GetAsyncOutput createAsyncOutput(Xid xid) {
+        GetAsyncOutputBuilder asyncOutputBuilder = new GetAsyncOutputBuilder();
+        asyncOutputBuilder.setFlowRemovedMask(Collections.<FlowRemovedMask> emptyList());
+        asyncOutputBuilder.setPacketInMask(Collections.<PacketInMask> emptyList());
+        asyncOutputBuilder.setPortStatusMask(Collections.<PortStatusMask> emptyList());
+        asyncOutputBuilder.setVersion(OFConstants.OFP_VERSION_1_3);
+        asyncOutputBuilder.setXid(xid.getValue());
+        return asyncOutputBuilder.build();
+    }
+
     @Test
     public void testProcessReply() {
-        final Xid xid = xidGen.generate();
-        final GetAsyncOutput asyncOutput = createAsyncOutput(xid);
+        GetAsyncOutput asyncOutput = createAsyncOutput(xid);
         LOG.info("Hooking RequestContext");
         deviceContext.hookRequestCtx(xid, requestContext);
-        Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid));
+        Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid.getValue()));
 
         Assert.assertFalse(requestContext.getFuture().isDone());
         LOG.info("Sending reply from device");
-        deviceContext.processReply(xid, asyncOutput);
+        deviceContext.processReply(asyncOutput);
         Assert.assertTrue(requestContext.getFuture().isDone());
 
         LOG.info("Checking RequestContext.future");
         try {
-            final Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
-            final GetAsyncOutput getAsyncOutput = (GetAsyncOutput) object;
+            Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
+            RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
+            GetAsyncOutput getAsyncOutput = (GetAsyncOutput) rpcResult.getResult();
             assertEquals(asyncOutput.getVersion(), getAsyncOutput.getVersion());
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             LOG.error("Test failed when checking RequestContext.future", e);
             fail("fail");
         }
+        Assert.assertTrue(deviceContext.getRequests().isEmpty());
     }
 
-    private GetAsyncOutput createAsyncOutput(final Xid xid) {
-        final GetAsyncOutputBuilder asyncOutputBuilder = new GetAsyncOutputBuilder();
-        asyncOutputBuilder.setFlowRemovedMask(Collections.<FlowRemovedMask> emptyList());
-        asyncOutputBuilder.setPacketInMask(Collections.<PacketInMask> emptyList());
-        asyncOutputBuilder.setPortStatusMask(Collections.<PortStatusMask> emptyList());
-        asyncOutputBuilder.setVersion(OFConstants.OFP_VERSION_1_3);
-        asyncOutputBuilder.setXid(xid.getValue());
-        return asyncOutputBuilder.build();
+    private static Error createError(Xid xid) {
+        ErrorMessageBuilder errorMessageBuilder = new ErrorMessageBuilder();
+        errorMessageBuilder.setCode(42);
+        errorMessageBuilder.setCodeString("42");
+        errorMessageBuilder.setXid(xid.getValue());
+        return errorMessageBuilder.build();
+    }
+
+    @Test
+    public void testProcessReplyError() {
+        LOG.info("Hooking RequestContext");
+        deviceContext.hookRequestCtx(xid, requestContext);
+        Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid.getValue()));
+
+        Assert.assertFalse(requestContext.getFuture().isDone());
+        LOG.info("Sending error reply from device");
+        Error error = createError(xid);
+        deviceContext.processReply(error);
+        Assert.assertTrue(requestContext.getFuture().isDone());
+
+        LOG.info("Checking RequestContext.future");
+        try {
+            Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
+            RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
+            Assert.assertFalse(rpcResult.isSuccessful());
+            List<RpcError> errors = (List<RpcError>) rpcResult.getErrors();
+            Assert.assertTrue(errors.get(0).getCause() instanceof DeviceDataException);
+            DeviceDataException cause = (DeviceDataException) errors.get(0).getCause();
+            Assert.assertEquals(error, cause.getError());
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Test failed when checking RequestContext.future", e);
+            fail("fail");
+        }
+        Assert.assertTrue(deviceContext.getRequests().isEmpty());
+    }
+
+    @Test
+    public void testProcessReplyList() {
+        LOG.info("Hooking RequestContext");
+        deviceContext.hookRequestCtx(xidMulti, requestContextMultiReply);
+        Assert.assertEquals(requestContextMultiReply, deviceContext.getRequests().get(xidMulti.getValue()));
+
+        Assert.assertFalse(requestContextMultiReply.getFuture().isDone());
+        LOG.info("Sending reply from device");
+        deviceContext.processReply(xidMulti, createMultipartReplyList(xidMulti));
+        Assert.assertTrue(requestContextMultiReply.getFuture().isDone());
+
+        LOG.info("Checking RequestContext.future");
+        try {
+            Object object = requestContextMultiReply.getFuture().get(1L, TimeUnit.SECONDS);
+            RpcResult<List<OfHeader>> rpcResult = (RpcResult<List<OfHeader>>) object;
+            List<OfHeader> multipartReplies = rpcResult.getResult();
+            List<OfHeader> expectedMpReplies = createMultipartReplyList(xidMulti);
+            assertEquals(expectedMpReplies, multipartReplies);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.error("Test failed when checking RequestContext.future", e);
+            fail("fail");
+        }
+        Assert.assertTrue(deviceContext.getRequests().isEmpty());
+    }
+
+    private static List<OfHeader> createMultipartReplyList(Xid xid) {
+        final MultipartReplyDesc descValue = new MultipartReplyDescBuilder().setHwDesc("hw-test-value").build();
+        final MultipartReplyDescCase replyBody = new MultipartReplyDescCaseBuilder()
+                                                        .setMultipartReplyDesc(descValue).build();
+        List<OfHeader> multipartReplies = new ArrayList<OfHeader>();
+        multipartReplies.add(new MultipartReplyMessageBuilder()
+                                    .setMultipartReplyBody(replyBody)
+                                    .setXid(xid.getValue())
+                                    .setFlags(new MultipartRequestFlags(false))
+                                    .build());
+        multipartReplies.add(new MultipartReplyMessageBuilder()
+                                    .setMultipartReplyBody(replyBody)
+                                    .setXid(xid.getValue())
+                                    .setFlags(new MultipartRequestFlags(true))
+                                    .build());
+        return multipartReplies;
+    }
+
+    @Test
+    public void testProcessException() {
+        LOG.info("Hooking RequestContext");
+        deviceContext.hookRequestCtx(xid, requestContext);
+        Assert.assertEquals(requestContext, deviceContext.getRequests().get(xid.getValue()));
+
+        Assert.assertFalse(requestContext.getFuture().isDone());
+
+        LOG.info("Sending reply from device");
+        deviceContext.processException(xid, new DeviceDataException("Some freakin' error", new NullPointerException()));
+        Assert.assertTrue(requestContext.getFuture().isDone());
+
+        LOG.info("Checking RequestContext.future");
+        try {
+                Object object = requestContext.getFuture().get(1L, TimeUnit.SECONDS);
+                RpcResult<OfHeader> rpcResult = (RpcResult<OfHeader>) object;
+                Assert.assertFalse(rpcResult.isSuccessful());
+                List<RpcError> errors = (List<RpcError>) rpcResult.getErrors();
+                Assert.assertTrue(errors.get(0).getCause() instanceof DeviceDataException);
+                DeviceDataException cause = (DeviceDataException) errors.get(0).getCause();
+                Assert.assertTrue(cause.getCause() instanceof NullPointerException);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                LOG.error("Test failed when checking RequestContext.future", e);
+                fail("fail");
+        }
+        Assert.assertTrue(deviceContext.getRequests().isEmpty());
     }
 
 }