site improvement, warnings removal, applied queueKeeper, fixed tests, errorHandler... 73/2573/1
authorMichal Rehak <mirehak@cisco.com>
Thu, 7 Nov 2013 16:04:35 +0000 (17:04 +0100)
committerMichal Rehak <mirehak@cisco.com>
Sat, 9 Nov 2013 22:54:02 +0000 (23:54 +0100)
Change-Id: I44071a4bb381150679e15cc15ea2ee4d8683317f
Signed-off-by: Michal Rehak <mirehak@cisco.com>
commons/pom.xml
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorHandler.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorHandlerQueueImpl.java [moved from openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorQueueHandler.java with 51% similarity]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java

index ee91bf3e58f6608f7fa20b8dcf680ba3894ce876..572208c47e73c75224e601507cd41a6458169f81 100755 (executable)
         <id>findbugsReport</id>
         <reporting>
           <plugins>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-project-info-reports-plugin</artifactId>
+              <version>2.7</version>
+              <configuration>
+                <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
+                <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+              </configuration>
+              <reportSets>
+                <reportSet>
+                  <reports>
+                    <report>index</report>
+                    <report>project-team</report>
+                    <report>license</report>
+                    <report>mailing-list</report>
+                    <report>plugin-management</report>
+                    <report>cim</report>
+                    <report>issue-tracking</report>
+                    <report>scm</report>
+                    <report>summary</report>
+                  </reports>
+                </reportSet>
+              </reportSets>
+            </plugin>
             <plugin>
               <groupId>org.codehaus.mojo</groupId>
               <artifactId>findbugs-maven-plugin</artifactId>
               <version>2.5.2</version>
             </plugin>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-pmd-plugin</artifactId>
+              <version>3.0.1</version>
+            </plugin>
+            <plugin>
+              <groupId>org.apache.maven.plugins</groupId>
+              <artifactId>maven-jxr-plugin</artifactId>
+              <version>2.3</version>
+            </plugin>
           </plugins>
         </reporting>
       </profile>
index db39d72f1e05b335bf1606b0d83d6b0b44f34ea7..c7c3b5cba231cdff52f49b9bc8eb0e7df78a0479 100644 (file)
@@ -103,4 +103,9 @@ public interface ConnectionConductor {
      */
     void setQueueKeeper(QueueKeeper<Object> queueKeeper);
 
+    /**
+     * @param errorHandler for internal exception handling
+     */
+    void setErrorHandler(ErrorHandler errorHandler);
+
 }
index 70fa95b5af4640206066e72df4df57267fd6f79d..cfc4011453ff8c9f4d79b1d5ac34cb7031d625b4 100644 (file)
@@ -8,15 +8,9 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
@@ -24,7 +18,6 @@ import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
@@ -45,7 +38,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
@@ -67,9 +59,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      * BitMaps from switches.
      */
     private static final boolean isBitmapNegotiationEnable = true;
-    private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
+    private ErrorHandler errorHandler;
 
-    protected final ConnectionAdapter connectionAdapter;
+    private final ConnectionAdapter connectionAdapter;
     private ConnectionConductor.CONDUCTOR_STATE conductorState;
     private Short version;
 
@@ -77,8 +69,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     private SessionContext sessionContext;
 
-    private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
-
     protected boolean isFirstHelloNegotiation = true;
 
     // TODO: use appropriate interface instead of Object
@@ -92,7 +82,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
         this.connectionAdapter = connectionAdapter;
         conductorState = CONDUCTOR_STATE.HANDSHAKING;
-        new Thread(new ErrorQueueHandler(errorQueue)).start();
     }
 
     @Override
@@ -110,6 +99,13 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         this.queueKeeper = queueKeeper;
     }
 
+    /**
+     * @param errorHandler the errorHandler to set
+     */
+    @Override
+    public void setErrorHandler(ErrorHandler errorHandler) {
+        this.errorHandler = errorHandler;
+    }
 
     /**
      * send first hello message to switch
@@ -122,7 +118,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         if (isBitmapNegotiationEnable) {
             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
             LOG.debug("sending first hello message: vertsion header={} , version bitmap={}", 
-                    highestVersion, helloInput.getElements());
+                    highestVersion, MessageFactory.digVersions(helloInput.getElements()));
         } else {
             helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
             LOG.debug("sending first hello message: version header={} ", highestVersion);
@@ -134,7 +130,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
             LOG.debug("FIRST HELLO sent.");
         } catch (Throwable e) {
             LOG.debug("FIRST HELLO sending failed.");
-            handleException(e);
+            errorHandler.handleException(e, getSessionContext());
         }
     }
 
@@ -149,7 +145,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                 builder.setXid(echoRequestMessage.getXid());
                 builder.setData(echoRequestMessage.getData());
                 
-                connectionAdapter.echoReply(builder.build());
+                getConnectionAdapter().echoReply(builder.build());
             }
         }).start();            
     }
@@ -157,18 +153,16 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     @Override
     public void onErrorMessage(ErrorMessage errorMessage) {
         queueKeeper.push(ErrorMessage.class, errorMessage, this);
-       // notifyListeners(ErrorMessage.class, errorMessage);
     }
 
     @Override
     public void onExperimenterMessage(ExperimenterMessage experimenterMessage) {
         queueKeeper.push(ExperimenterMessage.class, experimenterMessage, this);
-//        notifyListeners(ExperimenterMessage.class, experimenterMessage);
     }
 
     @Override
     public void onFlowRemovedMessage(FlowRemovedMessage message) {
-        notifyListeners(FlowRemovedMessage.class, message);
+        queueKeeper.push(FlowRemovedMessage.class, message, this);
     }
 
 
@@ -197,14 +191,14 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                 List<Elements> elements = hello.getElements();
                 Long xid = hello.getXid();
                 Short proposedVersion;
-                LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
+                LOG.debug("Hello message version={} and bitmap={}", remoteVersion, MessageFactory.digVersions(elements));
                 try {
                     // find the version from header version field
                     proposedVersion = proposeVersion(remoteVersion);
                     
                 } catch (IllegalArgumentException e) {
-                    handleException(e);
-                    connectionAdapter.disconnect();
+                    errorHandler.handleException(e, getSessionContext());
+                    getConnectionAdapter().disconnect();
                     return;
                 }
                 
@@ -220,8 +214,8 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                         // version in bitmap
                         proposedVersion = proposeBitmapVersion(elements);
                     } catch (IllegalArgumentException ex) {
-                        handleException(ex);
-                        connectionAdapter.disconnect();
+                        errorHandler.handleException(ex, getSessionContext());
+                        getConnectionAdapter().disconnect();
                         return;
                     }
                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
@@ -236,7 +230,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                     } else {
                         // terminate the connection.
                         LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
-                        connectionAdapter.disconnect();
+                        getConnectionAdapter().disconnect();
                     }
                 }
             }
@@ -256,7 +250,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
             result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
             smokeRpc(result);
         } catch (Throwable e) {
-            handleException(e);
+            errorHandler.handleException(e, getSessionContext());
         }
     }
 
@@ -310,12 +304,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
             OFSessionUtil.registerSession(this,
                     featureOutput, version);
-            this.setListenerMapping(OFSessionUtil.getListenersMap());
             LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
         } catch (Throwable e) {
             //handshake failed
             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
-            handleException(e);
+            errorHandler.handleException(e, getSessionContext());
             disconnect();
         }
     }
@@ -323,7 +316,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     /**
      * @return rpc-response timeout in [ms]
      */
-    private long getMaxTimeout() {
+    protected long getMaxTimeout() {
         // TODO:: get from configuration
         return 2000;
     }
@@ -331,93 +324,77 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     /**
      * @return milliseconds
      */
-    private TimeUnit getMaxTimeoutUnit() {
+    protected TimeUnit getMaxTimeoutUnit() {
         // TODO:: get from configuration
         return TimeUnit.MILLISECONDS;
     }
 
-
-    /**
-     * @param e
-     */
-    protected void handleException(Throwable e) {
-        String sessionKeyId = null;
-        if (getSessionContext() != null) {
-            sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
-        }
-        
-        Exception causeAndThread = new Exception(
-                "IN THREAD: "+Thread.currentThread().getName() +
-                "; session:"+sessionKeyId, e);
-        try {
-            errorQueue.put(causeAndThread);
-        } catch (InterruptedException e1) {
-            LOG.error(e1.getMessage(), e1);
-        }
-    }
-
     @Override
-    public void onMultipartReplyMessage(MultipartReplyMessage arg0) {
-        // TODO Auto-generated method stub
+    public void onMultipartReplyMessage(MultipartReplyMessage message) {
+        queueKeeper.push(MultipartReplyMessage.class, message, this);
     }
 
     @Override
-    public void onMultipartRequestMessage(MultipartRequestMessage arg0) {
-        // TODO Auto-generated method stub
+    public void onMultipartRequestMessage(MultipartRequestMessage message) {
+        queueKeeper.push(MultipartRequestMessage.class, message, this);
     }
 
     @Override
     public void onPacketInMessage(PacketInMessage message) {
-        notifyListeners(PacketInMessage.class, message);
+        queueKeeper.push(PacketInMessage.class, message, this);
     }
 
     @Override
     public void onPortStatusMessage(PortStatusMessage message) {
         this.getSessionContext().processPortStatusMsg(message);
-        notifyListeners(PortStatusMessage.class, message);
+        queueKeeper.push(PortStatusMessage.class, message, this);
     }
 
     @Override
     public void onSwitchIdleEvent(SwitchIdleEvent notification) {
-        if (!CONDUCTOR_STATE.WORKING.equals(conductorState)) {
-            // idle state in any other conductorState than WORKING means real
-            // problem and wont be handled by echoReply, but disconnection
-            disconnect();
-            OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
-        } else {
-            LOG.debug("first idle state occured");
-            EchoInputBuilder builder = new EchoInputBuilder();
-            builder.setVersion(version);
-            // TODO: get xid from sessionContext
-            builder.setXid(42L);
-
-            Future<RpcResult<EchoOutput>> echoReplyFuture = connectionAdapter
-                    .echo(builder.build());
-
-            try {
-                // TODO: read timeout from config
-                RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
-                        getMaxTimeoutUnit());
-                if (echoReplyValue.isSuccessful()) {
-                    conductorState = CONDUCTOR_STATE.WORKING;
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                if (!CONDUCTOR_STATE.WORKING.equals(getConductorState())) {
+                    // idle state in any other conductorState than WORKING means real
+                    // problem and wont be handled by echoReply, but disconnection
+                    disconnect();
+                    OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
                 } else {
-                    for (RpcError replyError : echoReplyValue.getErrors()) {
-                        Throwable cause = replyError.getCause();
-                        LOG.error(
-                                "while receiving echoReply in TIMEOUTING state: "
-                                        + cause.getMessage(), cause);
+                    LOG.debug("first idle state occured");
+                    EchoInputBuilder builder = new EchoInputBuilder();
+                    builder.setVersion(getVersion());
+                    builder.setXid(getSessionContext().getNextXid());
+                    
+                    Future<RpcResult<EchoOutput>> echoReplyFuture = getConnectionAdapter()
+                            .echo(builder.build());
+                    
+                    try {
+                        RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
+                                getMaxTimeoutUnit());
+                        if (echoReplyValue.isSuccessful()) {
+                            setConductorState(CONDUCTOR_STATE.WORKING);
+                        } else {
+                            for (RpcError replyError : echoReplyValue.getErrors()) {
+                                Throwable cause = replyError.getCause();
+                                LOG.error(
+                                        "while receiving echoReply in TIMEOUTING state: "
+                                                + cause.getMessage(), cause);
+                            }
+                            //switch issue occurred
+                            throw new Exception("switch issue occurred");
+                        }
+                    } catch (Exception e) {
+                        LOG.error("while waiting for echoReply in TIMEOUTING state: "
+                                + e.getMessage(), e);
+                        //switch is not responding
+                        disconnect();
+                        OFSessionUtil.getSessionManager().invalidateOnDisconnect(ConnectionConductorImpl.this);
                     }
-                    //switch issue occurred
-                    throw new Exception("switch issue occurred");
                 }
-            } catch (Exception e) {
-                LOG.error("while waiting for echoReply in TIMEOUTING state: "
-                        + e.getMessage(), e);
-                //switch is not responding
-                disconnect();
-                OFSessionUtil.getSessionManager().invalidateOnDisconnect(this);
             }
-        }
+            
+        }).start();
     }
 
     /**
@@ -544,36 +521,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         return sessionContext;
     }
     
-    /**
-     * @param listenerMapping the listenerMapping to set
-     */
-    public void setListenerMapping(
-            Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
-        this.listenerMapping = listenerMapping;
-    }
-
-    /**
-     * @param messageType
-     * @param message
-     * @deprecated use {@link QueueKeeper} strategy
-     */
-    @Deprecated
-    private void notifyListeners(Class<? extends DataObject> messageType, DataObject message) {
-        Collection<IMDMessageListener> listeners = listenerMapping.get(messageType);
-        if (listeners != null) {
-                for (IMDMessageListener listener : listeners) {
-                    // Pass cookie only for PACKT_IN
-                    if ( messageType.equals("PacketInMessage.class")){
-                        listener.receive(this.getAuxiliaryKey(), this.getSessionContext(), message);
-                    } else {
-                        listener.receive(null, this.getSessionContext(), message);
-                    }
-                }
-        } else {
-            LOG.warn("No listeners for this message Type {}", messageType);
-        }
-    }
-
     @Override
     public ConnectionAdapter getConnectionAdapter() {
         return connectionAdapter;
@@ -582,7 +529,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     @Override
     public void onConnectionReady() {
         LOG.debug("connection is ready-to-use");
-        //TODO: fire first helloMessage
         new Thread(new Runnable() {
             @Override
             public void run() {
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorHandler.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorHandler.java
new file mode 100644 (file)
index 0000000..d31372a
--- /dev/null
@@ -0,0 +1,24 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+
+/**
+ * @author mirehak
+ *
+ */
+public interface ErrorHandler {
+
+    /**
+     * @param e cause
+     * @param sessionContext of source
+     */
+    void handleException(Throwable e, SessionContext sessionContext);
+
+}
similarity index 51%
rename from openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorQueueHandler.java
rename to openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ErrorHandlerQueueImpl.java
index d715f046ed2ee718d51cf7b4af926205fc2c3d85..f27226d02daa9a5f5e01ea6e2ecedc76eca7fe69 100644 (file)
@@ -8,8 +8,10 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import java.util.Arrays;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -17,18 +19,18 @@ import org.slf4j.LoggerFactory;
  * dumping all exceptions to log
  * @author mirehak
  */
-public class ErrorQueueHandler implements Runnable {
+public class ErrorHandlerQueueImpl implements Runnable, ErrorHandler {
 
     private static final Logger LOG = LoggerFactory
-            .getLogger(ErrorQueueHandler.class);
+            .getLogger(ErrorHandlerQueueImpl.class);
 
     private LinkedBlockingQueue<Exception> errorQueue;
 
     /**
-     * @param errorQueue
+     * default ctor
      */
-    public ErrorQueueHandler(LinkedBlockingQueue<Exception> errorQueue) {
-        this.errorQueue = errorQueue;
+    public ErrorHandlerQueueImpl() {
+        this.errorQueue = new LinkedBlockingQueue<>();
     }
 
     @Override
@@ -44,4 +46,25 @@ public class ErrorQueueHandler implements Runnable {
             }
         }
     }
+    
+    /**
+     * @param e
+     * @param sessionContext TODO
+     */
+    @Override
+    public void handleException(Throwable e, SessionContext sessionContext) {
+        String sessionKeyId = null;
+        if (sessionContext != null) {
+            sessionKeyId = Arrays.toString(sessionContext.getSessionKey().getId());
+        }
+        
+        Exception causeAndThread = new Exception(
+                "IN THREAD: "+Thread.currentThread().getName() +
+                "; session:"+sessionKeyId, e);
+        try {
+            errorQueue.put(causeAndThread);
+        } catch (InterruptedException e1) {
+            LOG.error(e1.getMessage(), e1);
+        }
+    }
 }
index 1ba75ed1c32a1c537e5a4d6356f1ae0fd7e3b44e..04e7bfa174ecfa781db26987afd460309ed721d0 100644 (file)
@@ -48,7 +48,7 @@ public class MDController implements IMDController {
 
     public void init() {
         LOG.debug("Initializing!");
-        this.messageListeners = new ConcurrentHashMap<Class<? extends DataObject>, Collection<IMDMessageListener>>();
+        this.messageListeners = new ConcurrentHashMap<>();
         // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
         OFSessionUtil.getSessionManager().setListenerMapping(messageListeners);
     }
index 2fc0db0c0af3186679fd28db589a8e0cd70caa95..6266e22ef194be59d8d4def841b3083d1feafbb4 100644 (file)
@@ -77,4 +77,20 @@ public abstract class MessageFactory {
         }
         return helloInputbuilder.build();
     }
+
+    /**
+     * @param elements
+     * @return version boolean list
+     */
+    public static List<Boolean> digVersions(List<Elements> elements) {
+        List<Boolean> result = null;
+        if (elements != null && !elements.isEmpty()) {
+            for (Elements elm : elements) {
+                if (HelloElementType.VERSIONBITMAP.equals(elm.getType())) {
+                    result = elm.getVersionBitmap();
+                }
+            }
+        }
+        return result;
+    }
 }
index ff1beb58d78be2a2cbc3c475410a2d887224cbc7..f26375bf5a28d78f155d841a31574868d064cfcb 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
 public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
     
     private QueueKeeperLightImpl<Object> queueKeeper;
+    private ErrorHandler errorHandler;
 
     /**
      * 
@@ -32,6 +33,8 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
         queueKeeper.init();
         // TODO: add pop-listeners consuming object processed via queue
         //queueKeeper.addPopListener(listener);
+        
+        errorHandler = new ErrorHandlerQueueImpl();
     }
 
     @Override
@@ -44,6 +47,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
     public void onSwitchConnected(ConnectionAdapter connectionAdapter) {
         ConnectionConductor conductor = ConnectionConductorFactory.createConductor(
                 connectionAdapter, queueKeeper);
+        conductor.setErrorHandler(errorHandler);
     }
 
 }
index aa61a61846f7424e15cb09b35f0dc85b2258b409..e1f8e41149bfa129056007105c1713609e79cceb 100644 (file)
@@ -62,6 +62,9 @@ public class ConnectionConductorImplTest {
     protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImplTest.class);
 
+    /** in [ms] */
+    private final int maxProcessingTimeout = 500;
+    
     protected ConnectionAdapterStackImpl adapter;
     private ConnectionConductorImpl connectionConductor;
     private MDController controller;
@@ -122,7 +125,6 @@ public class ConnectionConductorImplTest {
         popListener = new PopListenerCountingImpl<>();
         
         queueKeeper = new QueueKeeperLightImpl<>();
-        queueKeeper.setListenerMapping(assembleListenerMapping());
         queueKeeper.init();
         queueKeeper.addPopListener(popListener);
         
@@ -131,10 +133,13 @@ public class ConnectionConductorImplTest {
         connectionConductor.init();
         controller = new MDController();
         controller.init();
+        queueKeeper.setListenerMapping(controller.getMessageListeners());
         eventPlan = new Stack<>();
         adapter.setEventPlan(eventPlan);
         adapter.setProceedTimeout(5000L);
         adapter.checkListeners();
+        
+        controller.getMessageListeners().putAll(assembleListenerMapping());
     }
 
     /**
@@ -281,8 +286,6 @@ public class ConnectionConductorImplTest {
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
                 EventFactory.DEFAULT_VERSION, builder1));
         
-        connectionConductor.setListenerMapping(assembleListenerMapping());
-        
         executeLater();
 
         Runnable sendExperimenterCmd = new Runnable() {
@@ -364,9 +367,15 @@ public class ConnectionConductorImplTest {
         FlowRemovedMessageBuilder builder1 = new FlowRemovedMessageBuilder();
         builder1.setXid(1L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(1, flowremovedMessageCounter);
         builder1.setXid(2L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(2, flowremovedMessageCounter);
     }
 
@@ -424,9 +433,15 @@ public class ConnectionConductorImplTest {
         PacketInMessageBuilder builder1 = new PacketInMessageBuilder();
         builder1.setBufferId((long)1);
         connectionConductor.onPacketInMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(1, packetinMessageCounter);
         builder1.setBufferId((long)2);
         connectionConductor.onPacketInMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(2, packetinMessageCounter);
     }
 
@@ -464,12 +479,21 @@ public class ConnectionConductorImplTest {
         PortFeatures features = new PortFeatures(true,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRADD).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(1, portstatusAddMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRMODIFY).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(1, portstatusModifyMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRDELETE).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
+        synchronized (popListener) {
+            popListener.wait(maxProcessingTimeout);
+        }
         Assert.assertEquals(1, portstatusDeleteMessageCounter);
     }
 
@@ -792,9 +816,6 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnExperimenterMessage() throws InterruptedException {
-        final int maxProcessingTimeout = 500;
-        
-        connectionConductor.setListenerMapping(assembleListenerMapping());
         ExperimenterMessageBuilder builder1 = new ExperimenterMessageBuilder();
         builder1.setExperimenter(84L).setExpType(4L);
         connectionConductor.onExperimenterMessage(builder1.build());
@@ -819,8 +840,6 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnErrorMessage() throws InterruptedException {
-        final int maxProcessingTimeout = 500;
-        connectionConductor.setListenerMapping(assembleListenerMapping());
         ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
         builder1.setCode(100);
         connectionConductor.onErrorMessage(builder1.build());
@@ -837,7 +856,11 @@ public class ConnectionConductorImplTest {
     }
 
     /**
-     * @return listener mapping
+     * @return listener mapping for :
+     * <ul>
+     * <li>experimenter</li>
+     * <li>error</li>
+     * </ul>
      */
     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> assembleListenerMapping() {
         IMDMessageListener objEms = new ExperimenterMessageService() ;
index e988277146b558d95fcf3ddbb4fd24650ea6a054..19567c48c8de390f0b8cd5b3c518e0dd6f1b446f 100644 (file)
@@ -16,6 +16,7 @@ import org.junit.Test;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.ErrorHandler;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeper;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
@@ -368,6 +369,10 @@ class MockConnectionConductor implements ConnectionConductor {
     public void setQueueKeeper(QueueKeeper<Object> queueKeeper) {
         // do nothing yet
     }
+
+    @Override
+    public void setErrorHandler(ErrorHandler errorHandler) {
+    }
 }
 
 enum MessageType {