extended integration tests with OFLibrary, improved error handling for threads 85/2285/1
authorMichal Rehak <mirehak@cisco.com>
Wed, 30 Oct 2013 15:47:36 +0000 (16:47 +0100)
committerMichal Rehak <mirehak@cisco.com>
Thu, 31 Oct 2013 12:55:04 +0000 (13:55 +0100)
Change-Id: I7bc1f752ec0a97f69401b7a2f3612dd370189a98
Signed-off-by: Michal Rehak <mirehak@cisco.com>
openflowplugin-it/src/test/java/org/opendaylight/openflowplugin/openflow/md/it/OFPluginToLibraryTest.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorQueueHandler.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/OFSessionUtil.java

index 96e50e8e9fd6fce2af868fec8d047d5ab9fa75c7..ca8f76b2e233a3fcc7b219f89310b7806763392d 100644 (file)
@@ -5,13 +5,22 @@ import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
 import static org.ops4j.pax.exam.CoreOptions.options;
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
 
+import java.util.Stack;
+import java.util.concurrent.TimeUnit;
+
 import javax.inject.Inject;
 
+import junit.framework.Assert;
+
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.opendaylight.openflowjava.protocol.impl.clients.ClientEvent;
 import org.opendaylight.openflowjava.protocol.impl.clients.ScenarioFactory;
 import org.opendaylight.openflowjava.protocol.impl.clients.ScenarioHandler;
+import org.opendaylight.openflowjava.protocol.impl.clients.SendEvent;
 import org.opendaylight.openflowjava.protocol.impl.clients.SimpleClient;
+import org.opendaylight.openflowjava.protocol.impl.clients.SleepEvent;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.Option;
@@ -50,21 +59,100 @@ public class OFPluginToLibraryTest {
     @Inject
     BundleContext ctx;
 
+    private SimpleClient switchSim;
+
+    @After
+    public void tearDown() {
+        try {
+            LOG.debug("tearing down simulator");
+            switchSim.getScenarioDone().get(getFailSafeTimeout(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            String msg = "waiting for scenario to finish failed: "+e.getMessage();
+            LOG.error(msg, e);
+            Assert.fail(msg);
+        }
+    }
+
+    /**
+     * test basic integration with OFLib running the handshake
+     * @throws Exception
+     */
+    @Test
+    public void handshakeOk() throws Exception {
+        LOG.debug("handshake integration test");
+        LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
+
+        switchSim = new SimpleClient("localhost", 6653);
+        switchSim.setSecuredClient(false);
+        Stack<ClientEvent> handshakeScenario = ScenarioFactory.createHandshakeScenario();
+        
+        ScenarioHandler scenario = new ScenarioHandler(handshakeScenario);
+        switchSim.setScenarioHandler(scenario);
+        switchSim.start();
+        try {
+            switchSim.getScenarioDone().get(getFailSafeTimeout(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            String msg = "waiting for scenario to finish failed: "+e.getMessage();
+            LOG.error(msg, e);
+            Assert.fail(msg);
+        }
+        //TODO: dump errors of plugin
+    }
+    
+    /**
+     * test basic integration with OFLib running the handshake
+     * @throws Exception
+     */
+    @Test
+    public void handshakeFail1() throws Exception {
+        LOG.debug("handshake integration test");
+        LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
+
+        switchSim = new SimpleClient("localhost", 6653);
+        switchSim.setSecuredClient(false);
+        Stack<ClientEvent> handshakeScenario = ScenarioFactory.createHandshakeScenario();
+        SendEvent featuresReply = new SendEvent(new byte[] {4, 6, 0, 32, 0, 0, 0, 3, 0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 1, 1, 0, 0, 0, 1, 2, 3, 0, 1, 2, 3});
+        handshakeScenario.setElementAt(featuresReply, 0);
+        
+        ScenarioHandler scenario = new ScenarioHandler(handshakeScenario);
+        switchSim.setScenarioHandler(scenario);
+        switchSim.start();
+        try {
+            switchSim.getScenarioDone().get(getFailSafeTimeout(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            String msg = "waiting for scenario to finish failed: "+e.getMessage();
+            LOG.error(msg, e);
+            Assert.fail(msg);
+        }
+        //TODO: dump errors of plugin
+    }
+    
     /**
      * test basic integration with OFLib running the handshake
      * @throws Exception
      */
     @Test
-    public void handshake() throws Exception {
+    public void handshakeFail2() throws Exception {
         LOG.debug("handshake integration test");
         LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
 
-        SimpleClient switchSim = new SimpleClient("localhost", 6653);
+        switchSim = new SimpleClient("localhost", 6653);
         switchSim.setSecuredClient(false);
-        ScenarioHandler scenario = new ScenarioHandler(ScenarioFactory.createHandshakeScenario());
+        Stack<ClientEvent> handshakeScenario = ScenarioFactory.createHandshakeScenario();
+        handshakeScenario.setElementAt(new SleepEvent(5000), 0);
+        
+        ScenarioHandler scenario = new ScenarioHandler(handshakeScenario);
         switchSim.setScenarioHandler(scenario);
         switchSim.start();
-        switchSim.getScenarioDone().get();
+        tearDown();
+        //TODO: dump errors of plugin
+    }
+
+    /**
+     * @return timeout for case of failure
+     */
+    private static long getFailSafeTimeout() {
+        return 20000;
     }
 
 
index 509a94c9f4430f7e1883032ade81282af451b688..266201878145bccb6a50a7133ee9fab8db1cc6d6 100644 (file)
@@ -48,8 +48,8 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 
 /**
  * @author mirehak
@@ -261,7 +261,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         LOG.debug("version set: " + proposedVersion);
         // request features
         GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
-            featuresBuilder.setVersion(version).setXid(xid);
+        featuresBuilder.setVersion(version).setXid(xid);
         LOG.debug("sending feature request for version={} and xid={}", version, xid);
         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
                 .getFeatures(featuresBuilder.build());
@@ -275,8 +275,10 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                         , rpcFeatures.getErrors());
             } else {
                 GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
-                LOG.debug("obtained features: datapathId={}"
-                        , featureOutput.getDatapathId());
+                LOG.debug("obtained features: datapathId={}",
+                        featureOutput.getDatapathId());
+                LOG.debug("obtained features: auxiliaryId={}",
+                        featureOutput.getAuxiliaryId());
                 conductorState = CONDUCTOR_STATE.WORKING;
 
                 OFSessionUtil.registerSession(this,
@@ -285,7 +287,10 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                 LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
             }
         } catch (Exception e) {
+            //handshake failed
+            LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
             handleException(e);
+            disconnect();
         }
     }
 
@@ -301,8 +306,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      * @param e
      */
     private void handleException(Exception e) {
+        Exception causeAndThread = new Exception("IN THREAD: "+Thread.currentThread().getName(), e);
         try {
-            errorQueue.put(e);
+            errorQueue.put(causeAndThread);
         } catch (InterruptedException e1) {
             LOG.error(e1.getMessage(), e1);
         }
@@ -334,6 +340,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
             // idle state in any other conductorState than WORKING means real
             // problem and wont be handled by echoReply, but disconnection
+            disconnect();
             OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
         } else {
             LOG.debug("first idle state occured");
@@ -347,7 +354,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
             try {
                 // TODO: read timeout from config
-                RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(5,
+                RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
                         TimeUnit.SECONDS);
                 if (echoReplyValue.isSuccessful()) {
                     conductorState = CONDUCTOR_STATE.WORKING;
@@ -358,10 +365,15 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                                 "while receiving echoReply in TIMEOUTING state: "
                                         + cause.getMessage(), cause);
                     }
+                    //switch issue occurred
+                    throw new Exception("switch issue occurred");
                 }
             } catch (Exception e) {
                 LOG.error("while waiting for echoReply in TIMEOUTING state: "
                         + e.getMessage(), e);
+                //switch is not responding
+                disconnect();
+                OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
             }
         }
     }
@@ -457,7 +469,17 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public Future<Boolean> disconnect() {
-        return connectionAdapter.disconnect();
+        LOG.info("disconnecting: sessionCtx="+sessionContext+"|auxId="+auxiliaryKey);
+        
+        Future<Boolean> result = null;
+        if (connectionAdapter.isAlive()) {
+            result = connectionAdapter.disconnect();
+        } else {
+            LOG.debug("connection already disconnected");
+            result = Futures.immediateFuture(true);
+        }
+        
+        return result; 
     }
 
     @Override
index c3836e565a922750b798bf829d9a40948eec5c03..d715f046ed2ee718d51cf7b4af926205fc2c3d85 100644 (file)
@@ -37,7 +37,8 @@ public class ErrorQueueHandler implements Runnable {
             Exception error;
             try {
                 error = errorQueue.take();
-                LOG.error(error.getMessage(), error);
+                Throwable cause = error.getCause();
+                LOG.error(error.getMessage()+" -> "+cause.getMessage(), cause);
             } catch (InterruptedException e) {
                 LOG.warn(e.getMessage());
             }
index a4c20a47ecccac64f41828c2c25763a31518732b..a7e367396a22be88088c4750e79806bd94a39164 100644 (file)
@@ -37,8 +37,7 @@ public abstract class OFSessionUtil {
             GetFeaturesOutput features, short version) {
         SwitchConnectionDistinguisher sessionKey = createSwitchSessionKey(features
                 .getDatapathId());
-        SessionContext sessionContext = getSessionManager().getSessionContext(
-                sessionKey);
+        SessionContext sessionContext = getSessionManager().getSessionContext(sessionKey);
 
         if (features.getAuxiliaryId() == 0) {
             // handle primary
@@ -59,7 +58,7 @@ public abstract class OFSessionUtil {
         } else {
             // handle auxiliary
             if (sessionContext == null) {
-                LOG.warn("unexpected auxiliary connection - primary connection missing: "
+                throw new IllegalStateException("unexpected auxiliary connection - primary connection missing: "
                         + dumpDataPathId(features.getDatapathId()));
             } else {
                 // register auxiliary conductor into existing sessionContext
@@ -80,6 +79,16 @@ public abstract class OFSessionUtil {
                 connectionConductor.setConnectionCookie(auxiliaryKey);
             }
         }
+        
+        // check registration result
+        SessionContext resulContext = getSessionManager().getSessionContext(sessionKey);
+        if (resulContext == null) {
+            throw new IllegalStateException("session context registration failed");
+        } else {
+            if (!resulContext.isValid()) {
+                throw new IllegalStateException("registered session context is invalid");
+            }
+        }
     }
 
     /**