BUG-1254: fix concurrent add/remove session test
[openflowplugin.git] / openflowplugin / src / main / java / org / opendaylight / openflowplugin / openflow / md / core / session / SessionManagerOFImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.session;
10
11 import com.google.common.util.concurrent.ListeningExecutorService;
12
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Map.Entry;
18 import java.util.concurrent.ConcurrentHashMap;
19
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
22 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
23 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
24 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
25 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionListener;
26 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
27 import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
28 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
29 import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
30 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
31 import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.util.ListenerRegistry;
35 import org.opendaylight.yangtools.yang.binding.DataContainer;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * @author mirehak
42  */
43 public class SessionManagerOFImpl implements ConjunctSessionManager {
44
45     protected static final Logger LOG = LoggerFactory.getLogger(SessionManagerOFImpl.class);
46     private static SessionManagerOFImpl instance;
47     private ConcurrentHashMap<SwitchSessionKeyOF, SessionContext> sessionLot;
48     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
49     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping;
50
51     protected ListenerRegistry<SessionListener> sessionListeners;
52     private NotificationProviderService notificationProviderService;
53
54     private DataBroker dataBroker;
55     private ListeningExecutorService rpcPool;
56
57
58     /**
59      * @return singleton instance
60      */
61     public static ConjunctSessionManager getInstance() {
62         if (instance == null) {
63             synchronized (SessionContextOFImpl.class) {
64                 if (instance == null) {
65                     instance = new SessionManagerOFImpl();
66                 }
67             }
68         }
69         return instance;
70     }
71
72     /**
73      * close and release singleton instace
74      */
75     public static void releaseInstance() {
76         if (instance != null) {
77             synchronized (SessionManagerOFImpl.class) {
78                 if (instance != null) {
79                     instance.close();
80                     instance = null;
81                 }
82             }
83         }
84     }
85
86     private SessionManagerOFImpl() {
87         LOG.debug("singleton creating");
88         sessionLot = new ConcurrentHashMap<>();
89         sessionListeners = new ListenerRegistry<>();
90     }
91
92     @Override
93     public SessionContext getSessionContext(SwitchSessionKeyOF sessionKey) {
94         return sessionLot.get(sessionKey);
95     }
96
97     @Override
98     public void invalidateSessionContext(SwitchSessionKeyOF sessionKey) {
99         SessionContext context = getSessionContext(sessionKey);
100         if (context == null) {
101             LOG.warn("context for invalidation not found");
102         } else {
103             synchronized (context) {
104                 for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : context.getAuxiliaryConductors()) {
105                     invalidateAuxiliary(sessionKey, auxEntry.getKey());
106                 }
107                 context.getPrimaryConductor().disconnect();
108                 context.setValid(false);
109                 removeSessionContext(context);
110                 // TODO:: notify listeners
111             }
112         }
113     }
114
115     private void invalidateDeadSessionContext(SessionContext sessionContext) {
116         if (sessionContext == null) {
117             LOG.warn("context for invalidation not found");
118         } else {
119             synchronized (sessionContext) {
120                 for (Entry<SwitchConnectionDistinguisher, ConnectionConductor> auxEntry : sessionContext
121                         .getAuxiliaryConductors()) {
122                     invalidateAuxiliary(sessionContext, auxEntry.getKey(), true);
123                 }
124                 sessionContext.setValid(false);
125                 removeSessionContext(sessionContext);
126                 // TODO:: notify listeners
127             }
128         }
129     }
130
131     private void removeSessionContext(SessionContext sessionContext) {
132         if (LOG.isDebugEnabled()) {
133             LOG.debug("removing session: {}", Arrays.toString(sessionContext.getSessionKey().getId()));
134         }
135         sessionLot.remove(sessionContext.getSessionKey(), sessionContext);
136         sessionNotifier.onSessionRemoved(sessionContext);
137     }
138
139     @Override
140     public void addSessionContext(SwitchSessionKeyOF sessionKey, SessionContext context) {
141         synchronized (context) {
142             sessionLot.put(sessionKey, context);
143             sessionNotifier.onSessionAdded(sessionKey, context);
144
145             context.setValid(true);
146         }
147     }
148
149     @Override
150     public void invalidateAuxiliary(SwitchSessionKeyOF sessionKey,
151                                     SwitchConnectionDistinguisher connectionCookie) {
152         SessionContext context = getSessionContext(sessionKey);
153         invalidateAuxiliary(context, connectionCookie, true);
154     }
155
156     /**
157      * @param context
158      * @param connectionCookie
159      * @param disconnect       true if auxiliary connection is to be disconnected
160      */
161     private static void invalidateAuxiliary(SessionContext context, SwitchConnectionDistinguisher connectionCookie,
162                                             boolean disconnect) {
163         if (context == null) {
164             LOG.warn("context for invalidation not found");
165         } else {
166             ConnectionConductor auxiliaryConductor = context.removeAuxiliaryConductor(connectionCookie);
167             if (auxiliaryConductor == null) {
168                 LOG.warn("auxiliary conductor not found");
169             } else {
170                 if (disconnect) {
171                     auxiliaryConductor.disconnect();
172                 }
173             }
174         }
175     }
176
177     @Override
178     public void invalidateOnDisconnect(ConnectionConductor conductor) {
179         if (conductor.getAuxiliaryKey() == null) {
180             invalidateDeadSessionContext(conductor.getSessionContext());
181             // TODO:: notify listeners
182         } else {
183             invalidateAuxiliary(conductor.getSessionContext(), conductor.getAuxiliaryKey(), false);
184         }
185     }
186
187     @Override
188     public void setTranslatorMapping(Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping) {
189         this.translatorMapping = translatorMapping;
190     }
191
192     @Override
193     public ListenerRegistration<SessionListener> registerSessionListener(SessionListener listener) {
194         LOG.debug("registerSessionListener");
195         return sessionListeners.register(listener);
196     }
197
198     private final SessionListener sessionNotifier = new SessionListener() {
199
200         @Override
201         public void onSessionAdded(SwitchSessionKeyOF sessionKey, SessionContext context) {
202             for (ListenerRegistration<SessionListener> listener : sessionListeners) {
203                 try {
204                     listener.getInstance().onSessionAdded(sessionKey, context);
205                 } catch (Exception e) {
206                     LOG.error("Unhandled exeption occured while invoking onSessionAdded on listener", e);
207                 }
208             }
209         }
210
211         @Override
212         public void onSessionRemoved(SessionContext context) {
213             for (ListenerRegistration<SessionListener> listener : sessionListeners) {
214                 try {
215                     listener.getInstance().onSessionRemoved(context);
216                 } catch (Exception e) {
217                     LOG.error("Unhandled exeption occured while invoking onSessionRemoved on listener", e);
218                 }
219             }
220         }
221     };
222     private MessageSpy<DataContainer> messageSpy;
223     private ExtensionConverterProvider extensionConverterProvider;
224
225
226     @Override
227     public Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> getTranslatorMapping() {
228         return this.translatorMapping;
229     }
230
231     @Override
232     public void setNotificationProviderService(
233             NotificationProviderService notificationProviderService) {
234         this.notificationProviderService = notificationProviderService;
235
236     }
237
238     @Override
239     public DataBroker getDataBroker() {
240         return dataBroker;
241     }
242
243     @Override
244     public void setDataBroker(DataBroker dataBroker) {
245         this.dataBroker = dataBroker;
246
247     }
248
249     @Override
250     public NotificationProviderService getNotificationProviderService() {
251         return notificationProviderService;
252     }
253
254     @Override
255     public Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> getPopListenerMapping() {
256         return popListenerMapping;
257     }
258
259     @Override
260     public void setPopListenerMapping(
261             Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping) {
262         this.popListenerMapping = popListenerMapping;
263     }
264
265     @Override
266     public void close() {
267         LOG.debug("close");
268         synchronized (sessionLot) {
269             for (SessionContext sessionContext : sessionLot.values()) {
270                 sessionContext.getPrimaryConductor().disconnect();
271             }
272             // TODO: handle timeouted shutdown
273             rpcPool.shutdown();
274         }
275         
276         for (ListenerRegistration<SessionListener> listenerRegistration : sessionListeners) {
277             SessionListener listener = listenerRegistration.getInstance();
278             if (listener instanceof AutoCloseable) {
279                 try {
280                     ((AutoCloseable) listener).close();
281                 } catch (Exception e) {
282                     LOG.warn("closing of sessionListenerRegistration failed", e);
283                 }
284             }
285         }
286     }
287
288     @Override
289     public void setRpcPool(ListeningExecutorService rpcPool) {
290         this.rpcPool = rpcPool;
291     }
292
293     @Override
294     public ListeningExecutorService getRpcPool() {
295         return rpcPool;
296     }
297
298     @Override
299     public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
300         this.messageSpy = messageSpy;
301     }
302
303     @Override
304     public MessageSpy<DataContainer> getMessageSpy() {
305         return messageSpy;
306     }
307
308     @Override
309     public void setExtensionConverterProvider(
310             ExtensionConverterProvider extensionConverterProvider) {
311         this.extensionConverterProvider = extensionConverterProvider;
312     }
313
314     /**
315      * @return the extensionConverterProvider
316      */
317     @Override
318     public ExtensionConverterProvider getExtensionConverterProvider() {
319         return extensionConverterProvider;
320     }
321 }