Cleanup: use Java 8 lambdas
[netvirt.git] / openstack / net-virt / src / main / java / org / opendaylight / netvirt / openstack / netvirt / impl / EventDispatcherImpl.java
1 /*
2  * Copyright (c) 2013, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.openstack.netvirt.impl;
10
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import org.opendaylight.netvirt.openstack.netvirt.AbstractEvent;
13 import org.opendaylight.netvirt.openstack.netvirt.AbstractHandler;
14 import org.opendaylight.netvirt.openstack.netvirt.ConfigInterface;
15 import org.opendaylight.netvirt.openstack.netvirt.api.Constants;
16 import org.opendaylight.netvirt.openstack.netvirt.api.EventDispatcher;
17
18 import org.osgi.framework.ServiceReference;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.ThreadFactory;
28
29 public class EventDispatcherImpl implements EventDispatcher, ConfigInterface {
30     private static final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class);
31     private ExecutorService eventHandler;
32     private volatile BlockingQueue<AbstractEvent> events;
33     private AbstractHandler[] handlers;
34
35     public EventDispatcherImpl() {
36         events = new LinkedBlockingQueue<>();
37         handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
38         ThreadFactory threadFactory = new ThreadFactoryBuilder()
39             .setNameFormat("NV-EvtDsptchr-%d").build();
40         eventHandler = Executors.newSingleThreadExecutor(threadFactory);
41         start();
42     }
43
44     void start() {
45         eventHandler.submit(() -> {
46             Thread t = Thread.currentThread();
47             t.setName("EventDispatcherImpl");
48             LOG.info("EventDispatcherImpl: started {}", t.getName());
49             while (true) {
50                 AbstractEvent ev;
51                 try {
52                     ev = events.take();
53                 } catch (InterruptedException e) {
54                     LOG.info("The event handler thread was interrupted, shutting down", e);
55                     return;
56                 }
57                 try {
58                     dispatchEvent(ev);
59                 } catch (Exception e) {
60                     LOG.error("Exception in dispatching event {}", ev.toString(), e);
61                 }
62             }
63         });
64         LOG.debug("event dispatcher is started");
65     }
66
67     void stop() {
68         // stop accepting new tasks
69         eventHandler.shutdown();
70         try {
71             // Wait a while for existing tasks to terminate
72             if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
73                 eventHandler.shutdownNow();
74                 // Wait a while for tasks to respond to being cancelled
75                 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
76                     LOG.error("Dispatcher's event handler did not terminate");
77                 }
78             }
79         } catch (InterruptedException e) {
80             // (Re-)Cancel if current thread also interrupted
81             eventHandler.shutdownNow();
82             // Preserve interrupt status
83             Thread.currentThread().interrupt();
84         }
85         LOG.debug("event dispatcher is stopped");
86     }
87
88     private void dispatchEvent(AbstractEvent ev) {
89         LOG.trace("dispatchEvent: Processing (id={}): Event : {}", ev.getTransactionId(), ev);
90         AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
91         if (handler == null) {
92             LOG.warn("event dispatcher found no handler for : {}", ev);
93             return;
94         }
95
96         handler.processEvent(ev);
97         LOG.trace("dispatchEvent: Done processing (id={}): Event : {}", ev.getTransactionId(), ev);
98     }
99
100     public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
101         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
102         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
103         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
104             // The exception should give us a stacktrace
105             LOG.error("Abstract handler reg failed to provide a valid handler type: {} ref: {} handler: {}",
106                     handlerTypeObject, ref.getClass().getName(), handler.getClass().getName(),
107                     new IllegalArgumentException("Missing handler type"));
108             return;
109         }
110         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
111         handlers[handlerType.ordinal()] = handler;
112         LOG.debug("eventHandlerAdded: handler: {}, pid: {}, type: {}",
113                 handler.getClass().getName(), pid, handlerType);
114     }
115
116     public void eventHandlerRemoved(final ServiceReference ref){
117         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
118         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
119         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
120             LOG.error("Abstract handler unreg failed to provide a valid handler type {}", handlerTypeObject);
121             return;
122         }
123         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
124         handlers[handlerType.ordinal()] = null;
125         LOG.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
126     }
127
128     /**
129      * Enqueue the event.
130      *
131      * @param event the {@link AbstractEvent} event to be handled.
132      */
133     @Override
134     public void enqueueEvent(AbstractEvent event) {
135         if (event == null) {
136             LOG.warn("enqueueEvent: event is null");
137             return;
138         }
139
140         try {
141             events.put(event);
142         } catch (InterruptedException e) {
143             LOG.error("Thread was interrupted while trying to enqueue event ", e);
144         }
145     }
146
147     @Override
148     public void setDependencies(ServiceReference serviceReference) {}
149
150     @Override
151     public void setDependencies(Object impl) {}
152 }