Merge "Bug 8873 - Bundle based reconciliation to enable bundling of messages"
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / SessionManagerOFImpl.java
index a314018d777939e29bbbeea057bb2a3bce23a5ff..67cc3efce7e2abb1867c77a02ca0dd48e38ce6b2 100644 (file)
@@ -8,35 +8,37 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core.session;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
-import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
-import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
-import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
-import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
-import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionListener;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
+import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 /**
  * @author mirehak
  */
-public class SessionManagerOFImpl implements SessionManager {
+public class SessionManagerOFImpl implements ConjunctSessionManager {
 
     protected static final Logger LOG = LoggerFactory.getLogger(SessionManagerOFImpl.class);
     private static SessionManagerOFImpl instance;
@@ -47,14 +49,14 @@ public class SessionManagerOFImpl implements SessionManager {
     protected ListenerRegistry<SessionListener> sessionListeners;
     private NotificationProviderService notificationProviderService;
 
-    private DataProviderService dataProviderService;
+    private DataBroker dataBroker;
     private ListeningExecutorService rpcPool;
-    
+
 
     /**
      * @return singleton instance
      */
-    public static SessionManager getInstance() {
+    public static ConjunctSessionManager getInstance() {
         if (instance == null) {
             synchronized (SessionContextOFImpl.class) {
                 if (instance == null) {
@@ -64,9 +66,9 @@ public class SessionManagerOFImpl implements SessionManager {
         }
         return instance;
     }
-    
+
     /**
-     * close and release singleton instace
+     * close and release singleton instance
      */
     public static void releaseInstance() {
         if (instance != null) {
@@ -94,29 +96,44 @@ public class SessionManagerOFImpl implements SessionManager {
     public void invalidateSessionContext(SwitchSessionKeyOF sessionKey) {
         SessionContext context = getSessionContext(sessionKey);
         if (context == null) {
-            LOG.warn("context for invalidation not found");
+            LOG.info("context for invalidation not found");
         } else {
-            for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : context.getAuxiliaryConductors()) {
-                invalidateAuxiliary(sessionKey, auxEntry.getKey());
+            synchronized (context) {
+                if (context.isValid()) {
+                    for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : context.getAuxiliaryConductors()) {
+                        invalidateAuxiliary(sessionKey, auxEntry.getKey());
+                    }
+                    context.getPrimaryConductor().disconnect();
+                    context.setValid(false);
+                    removeSessionContext(context);
+                    // TODO:: notify listeners
+                } else {
+                    LOG.warn("Ignore invalid session context: {}",
+                             Arrays.toString(sessionKey.getId()));
+                }
             }
-            context.getPrimaryConductor().disconnect();
-            context.setValid(false);
-            removeSessionContext(context);
-            // TODO:: notify listeners
         }
     }
 
     private void invalidateDeadSessionContext(SessionContext sessionContext) {
         if (sessionContext == null) {
-            LOG.warn("context for invalidation not found");
+            LOG.info("context for invalidation not found");
         } else {
-            for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : sessionContext
-                    .getAuxiliaryConductors()) {
-                invalidateAuxiliary(sessionContext, auxEntry.getKey(), true);
+            synchronized (sessionContext) {
+                if (sessionContext.isValid()) {
+                    for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : sessionContext
+                             .getAuxiliaryConductors()) {
+                        invalidateAuxiliary(sessionContext, auxEntry.getKey(), true);
+                    }
+                    sessionContext.setValid(false);
+                    removeSessionContext(sessionContext);
+                    // TODO:: notify listeners
+                } else {
+                    LOG.warn("Ignore invalid dead session context: {}",
+                             Arrays.toString(
+                                 sessionContext.getSessionKey().getId()));
+                }
             }
-            sessionContext.setValid(false);
-            removeSessionContext(sessionContext);
-            // TODO:: notify listeners
         }
     }
 
@@ -124,21 +141,31 @@ public class SessionManagerOFImpl implements SessionManager {
         if (LOG.isDebugEnabled()) {
             LOG.debug("removing session: {}", Arrays.toString(sessionContext.getSessionKey().getId()));
         }
-        sessionLot.remove(sessionContext.getSessionKey(), sessionContext);
-        sessionNotifier.onSessionRemoved(sessionContext);
+        if (sessionLot.remove(sessionContext.getSessionKey(), sessionContext)) {
+            sessionNotifier.onSessionRemoved(sessionContext);
+        } else {
+            // This should never happen.
+            LOG.warn("Ignore session context that was already removed: {}",
+                     Arrays.toString(sessionContext.getSessionKey().getId()));
+        }
     }
 
     @Override
     public void addSessionContext(SwitchSessionKeyOF sessionKey, SessionContext context) {
-        sessionLot.put(sessionKey, context);
-
-        sessionNotifier.onSessionAdded(sessionKey, context);
-
+        synchronized (context) {
+            sessionLot.put(sessionKey, context);
+            sessionNotifier.onSessionAdded(sessionKey, context);
+            context.setValid(true);
+        }
     }
 
+    @Override
+    public void setRole(SessionContext context) {
+       sessionNotifier.setRole(context);
+    }
     @Override
     public void invalidateAuxiliary(SwitchSessionKeyOF sessionKey,
-            SwitchConnectionDistinguisher connectionCookie) {
+                                    SwitchConnectionDistinguisher connectionCookie) {
         SessionContext context = getSessionContext(sessionKey);
         invalidateAuxiliary(context, connectionCookie, true);
     }
@@ -146,13 +173,12 @@ public class SessionManagerOFImpl implements SessionManager {
     /**
      * @param context
      * @param connectionCookie
-     * @param disconnect
-     *            true if auxiliary connection is to be disconnected
+     * @param disconnect       true if auxiliary connection is to be disconnected
      */
     private static void invalidateAuxiliary(SessionContext context, SwitchConnectionDistinguisher connectionCookie,
-            boolean disconnect) {
+                                            boolean disconnect) {
         if (context == null) {
-            LOG.warn("context for invalidation not found");
+            LOG.info("context for invalidation not found");
         } else {
             ConnectionConductor auxiliaryConductor = context.removeAuxiliaryConductor(connectionCookie);
             if (auxiliaryConductor == null) {
@@ -199,6 +225,17 @@ public class SessionManagerOFImpl implements SessionManager {
             }
         }
 
+        @Override
+        public void setRole(SessionContext context) {
+            for (ListenerRegistration<SessionListener> listener : sessionListeners) {
+                try {
+                    listener.getInstance().setRole(context);
+                } catch (Exception e) {
+                    LOG.error("Unhandled exeption occured while invoking setRole on listener", e);
+                }
+            }
+        }
+
         @Override
         public void onSessionRemoved(SessionContext context) {
             for (ListenerRegistration<SessionListener> listener : sessionListeners) {
@@ -211,7 +248,8 @@ public class SessionManagerOFImpl implements SessionManager {
         }
     };
     private MessageSpy<DataContainer> messageSpy;
-    
+    private ExtensionConverterProvider extensionConverterProvider;
+
 
     @Override
     public Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> getTranslatorMapping() {
@@ -226,13 +264,13 @@ public class SessionManagerOFImpl implements SessionManager {
     }
 
     @Override
-    public DataProviderService getDataProviderService() {
-        return dataProviderService;
+    public DataBroker getDataBroker() {
+        return dataBroker;
     }
 
     @Override
-    public void setDataProviderService(DataProviderService dataServiceProvider) {
-        this.dataProviderService = dataServiceProvider;
+    public void setDataBroker(DataBroker dataBroker) {
+        this.dataBroker = dataBroker;
 
     }
 
@@ -251,11 +289,10 @@ public class SessionManagerOFImpl implements SessionManager {
             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping) {
         this.popListenerMapping = popListenerMapping;
     }
-    
+
     @Override
     public void close() {
         LOG.debug("close");
-        sessionListeners = null;
         synchronized (sessionLot) {
             for (SessionContext sessionContext : sessionLot.values()) {
                 sessionContext.getPrimaryConductor().disconnect();
@@ -263,25 +300,55 @@ public class SessionManagerOFImpl implements SessionManager {
             // TODO: handle timeouted shutdown
             rpcPool.shutdown();
         }
+
+        for (ListenerRegistration<SessionListener> listenerRegistration : sessionListeners) {
+            SessionListener listener = listenerRegistration.getInstance();
+            if (listener instanceof AutoCloseable) {
+                try {
+                    ((AutoCloseable) listener).close();
+                } catch (Exception e) {
+                    LOG.warn("closing of sessionListenerRegistration failed", e);
+                }
+            }
+        }
     }
 
     @Override
     public void setRpcPool(ListeningExecutorService rpcPool) {
         this.rpcPool = rpcPool;
     }
-    
+
     @Override
     public ListeningExecutorService getRpcPool() {
         return rpcPool;
     }
-    
+
     @Override
     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
         this.messageSpy = messageSpy;
     }
-    
+
     @Override
     public MessageSpy<DataContainer> getMessageSpy() {
         return messageSpy;
     }
+
+    @Override
+    public void setExtensionConverterProvider(
+            ExtensionConverterProvider extensionConverterProvider) {
+        this.extensionConverterProvider = extensionConverterProvider;
+    }
+
+    /**
+     * @return the extensionConverterProvider
+     */
+    @Override
+    public ExtensionConverterProvider getExtensionConverterProvider() {
+        return extensionConverterProvider;
+    }
+
+    @Override
+    public Collection<SessionContext> getAllSessions() {
+        return sessionLot.values();
+    }
 }