BUG-956 deadlock by rpc invocation - phase2
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / SessionManagerOFImpl.java
index 01b87ff4203bbd8e192476a3f673127e03f7889d..afda07fc38961ec1c24156b9a0082811d47cfd36 100644 (file)
@@ -8,23 +8,46 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core.session;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 /**
  * @author mirehak
  */
 public class SessionManagerOFImpl implements SessionManager {
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(SessionManagerOFImpl.class);
+    protected static final Logger LOG = LoggerFactory.getLogger(SessionManagerOFImpl.class);
     private static SessionManagerOFImpl instance;
-    private ConcurrentHashMap<SwitchConnectionDistinguisher, SessionContext> sessionLot;
+    private ConcurrentHashMap<SwitchSessionKeyOF, SessionContext> sessionLot;
+    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping;
+
+    protected ListenerRegistry<SessionListener> sessionListeners;
+    private NotificationProviderService notificationProviderService;
+
+    private DataProviderService dataProviderService;
+    private ListeningExecutorService rpcPool;
+    
 
     /**
      * @return singleton instance
@@ -35,31 +58,38 @@ public class SessionManagerOFImpl implements SessionManager {
         }
         return instance;
     }
+    
+    /**
+     * close and release singleton instace
+     */
+    public static synchronized void releaseInstance() {
+        instance.close();
+        instance = null;
+    }
 
     private SessionManagerOFImpl() {
+        LOG.debug("singleton creating");
         sessionLot = new ConcurrentHashMap<>();
+        sessionListeners = new ListenerRegistry<>();
     }
 
     @Override
-    public SessionContext getSessionContext(
-            SwitchConnectionDistinguisher sessionKey) {
+    public SessionContext getSessionContext(SwitchSessionKeyOF sessionKey) {
         return sessionLot.get(sessionKey);
     }
 
     @Override
-    public void invalidateSessionContext(
-            SwitchConnectionDistinguisher sessionKey) {
+    public void invalidateSessionContext(SwitchSessionKeyOF sessionKey) {
         SessionContext context = getSessionContext(sessionKey);
         if (context == null) {
             LOG.warn("context for invalidation not found");
         } else {
-            for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : context
-                    .getAuxiliaryConductors()) {
+            for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : context.getAuxiliaryConductors()) {
                 invalidateAuxiliary(sessionKey, auxEntry.getKey());
             }
             context.getPrimaryConductor().disconnect();
             context.setValid(false);
-            sessionLot.remove(sessionKey);
+            removeSessionContext(context);
             // TODO:: notify listeners
         }
     }
@@ -73,20 +103,29 @@ public class SessionManagerOFImpl implements SessionManager {
                 invalidateAuxiliary(sessionContext, auxEntry.getKey(), true);
             }
             sessionContext.setValid(false);
-            sessionLot.remove(sessionContext.getSessionKey(), sessionContext);
+            removeSessionContext(sessionContext);
             // TODO:: notify listeners
         }
     }
 
+    private void removeSessionContext(SessionContext sessionContext) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("removing session: {}", Arrays.toString(sessionContext.getSessionKey().getId()));
+        }
+        sessionLot.remove(sessionContext.getSessionKey(), sessionContext);
+        sessionNotifier.onSessionRemoved(sessionContext);
+    }
+
     @Override
-    public void addSessionContext(SwitchConnectionDistinguisher sessionKey,
-            SessionContext context) {
+    public void addSessionContext(SwitchSessionKeyOF sessionKey, SessionContext context) {
         sessionLot.put(sessionKey, context);
-        // TODO:: notify listeners
+
+        sessionNotifier.onSessionAdded(sessionKey, context);
+
     }
 
     @Override
-    public void invalidateAuxiliary(SwitchConnectionDistinguisher sessionKey,
+    public void invalidateAuxiliary(SwitchSessionKeyOF sessionKey,
             SwitchConnectionDistinguisher connectionCookie) {
         SessionContext context = getSessionContext(sessionKey);
         invalidateAuxiliary(context, connectionCookie, true);
@@ -98,13 +137,12 @@ public class SessionManagerOFImpl implements SessionManager {
      * @param disconnect
      *            true if auxiliary connection is to be disconnected
      */
-    private static void invalidateAuxiliary(SessionContext context,
-            SwitchConnectionDistinguisher connectionCookie, boolean disconnect) {
+    private static void invalidateAuxiliary(SessionContext context, SwitchConnectionDistinguisher connectionCookie,
+            boolean disconnect) {
         if (context == null) {
             LOG.warn("context for invalidation not found");
         } else {
-            ConnectionConductor auxiliaryConductor = context
-                    .removeAuxiliaryConductor(connectionCookie);
+            ConnectionConductor auxiliaryConductor = context.removeAuxiliaryConductor(connectionCookie);
             if (auxiliaryConductor == null) {
                 LOG.warn("auxiliary conductor not found");
             } else {
@@ -121,8 +159,106 @@ public class SessionManagerOFImpl implements SessionManager {
             invalidateDeadSessionContext(conductor.getSessionContext());
             // TODO:: notify listeners
         } else {
-            invalidateAuxiliary(conductor.getSessionContext(),
-                    conductor.getAuxiliaryKey(), false);
+            invalidateAuxiliary(conductor.getSessionContext(), conductor.getAuxiliaryKey(), false);
+        }
+    }
+
+    @Override
+    public void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
+        this.translatorMapping = translatorMapping;
+    }
+
+    @Override
+    public ListenerRegistration<SessionListener> registerSessionListener(SessionListener listener) {
+        LOG.debug("registerSessionListener");
+        return sessionListeners.register(listener);
+    }
+
+    private final SessionListener sessionNotifier = new SessionListener() {
+
+        @Override
+        public void onSessionAdded(SwitchSessionKeyOF sessionKey, SessionContext context) {
+            for (ListenerRegistration<SessionListener> listener : sessionListeners) {
+                try {
+                    listener.getInstance().onSessionAdded(sessionKey, context);
+                } catch (Exception e) {
+                    LOG.error("Unhandled exeption occured while invoking onSessionAdded on listener", e);
+                }
+            }
         }
+
+        @Override
+        public void onSessionRemoved(SessionContext context) {
+            for (ListenerRegistration<SessionListener> listener : sessionListeners) {
+                try {
+                    listener.getInstance().onSessionRemoved(context);
+                } catch (Exception e) {
+                    LOG.error("Unhandled exeption occured while invoking onSessionRemoved on listener", e);
+                }
+            }
+        }
+    };
+    
+
+    @Override
+    public Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> getTranslatorMapping() {
+        return this.translatorMapping;
+    }
+
+    @Override
+    public void setNotificationProviderService(
+            NotificationProviderService notificationProviderService) {
+        this.notificationProviderService = notificationProviderService;
+
+    }
+
+    @Override
+    public DataProviderService getDataProviderService() {
+        return dataProviderService;
+    }
+
+    @Override
+    public void setDataProviderService(DataProviderService dataServiceProvider) {
+        this.dataProviderService = dataServiceProvider;
+
+    }
+
+    @Override
+    public NotificationProviderService getNotificationProviderService() {
+        return notificationProviderService;
+    }
+
+    @Override
+    public Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> getPopListenerMapping() {
+        return popListenerMapping;
+    }
+
+    @Override
+    public void setPopListenerMapping(
+            Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping) {
+        this.popListenerMapping = popListenerMapping;
+    }
+    
+    @Override
+    public void close() {
+        LOG.debug("close");
+        sessionListeners = null;
+        synchronized (sessionLot) {
+            for (SessionContext sessionContext : sessionLot.values()) {
+                sessionContext.getPrimaryConductor().disconnect();
+            }
+            // TODO: handle timeouted shutdown
+            rpcPool.shutdown();
+        }
+    }
+
+    @Override
+    public void setRpcPool(ListeningExecutorService rpcPool) {
+        this.rpcPool = rpcPool;
+    }
+    
+    @Override
+    public ListeningExecutorService getRpcPool() {
+        return rpcPool;
     }
 }