Initial pass at changing groupId
[netvirt.git] / openstack / net-virt / src / main / java / org / opendaylight / netvirt / openstack / netvirt / impl / EventDispatcherImpl.java
1 /*
2  * Copyright (c) 2013, 2015 Red Hat, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.openstack.netvirt.impl;
10
11 import org.opendaylight.netvirt.openstack.netvirt.AbstractEvent;
12 import org.opendaylight.netvirt.openstack.netvirt.AbstractHandler;
13 import org.opendaylight.netvirt.openstack.netvirt.ConfigInterface;
14 import org.opendaylight.netvirt.openstack.netvirt.api.Constants;
15 import org.opendaylight.netvirt.openstack.netvirt.api.EventDispatcher;
16
17 import org.osgi.framework.ServiceReference;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26
27 public class EventDispatcherImpl implements EventDispatcher, ConfigInterface {
28     private static final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class);
29     private ExecutorService eventHandler;
30     private volatile BlockingQueue<AbstractEvent> events;
31     private AbstractHandler[] handlers;
32
33     public EventDispatcherImpl() {
34         events = new LinkedBlockingQueue<>();
35         handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
36         eventHandler = Executors.newSingleThreadExecutor();
37         start();
38     }
39
40     void start() {
41         eventHandler.submit(new Runnable()  {
42             @Override
43             public void run() {
44                 Thread t = Thread.currentThread();
45                 t.setName("EventDispatcherImpl");
46                 LOG.info("EventDispatcherImpl: started {}", t.getName());
47                 while (true) {
48                     AbstractEvent ev;
49                     try {
50                         ev = events.take();
51                     } catch (InterruptedException e) {
52                         LOG.info("The event handler thread was interrupted, shutting down", e);
53                         return;
54                     }
55                     try {
56                         dispatchEvent(ev);
57                     } catch (Exception e) {
58                         LOG.error("Exception in dispatching event {}", ev.toString(), e);
59                     }
60                 }
61             }
62         });
63         LOG.debug("event dispatcher is started");
64     }
65
66     void stop() {
67         // stop accepting new tasks
68         eventHandler.shutdown();
69         try {
70             // Wait a while for existing tasks to terminate
71             if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
72                 eventHandler.shutdownNow();
73                 // Wait a while for tasks to respond to being cancelled
74                 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
75                     LOG.error("Dispatcher's event handler did not terminate");
76                 }
77             }
78         } catch (InterruptedException e) {
79             // (Re-)Cancel if current thread also interrupted
80             eventHandler.shutdownNow();
81             // Preserve interrupt status
82             Thread.currentThread().interrupt();
83         }
84         LOG.debug("event dispatcher is stopped");
85     }
86
87     private void dispatchEvent(AbstractEvent ev) {
88         LOG.trace("dispatchEvent: Processing (id={}): {}", ev.getTransactionId(), ev);
89         AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
90         if (handler == null) {
91             LOG.warn("event dispatcher found no handler for {}", ev);
92             return;
93         }
94
95         handler.processEvent(ev);
96         LOG.trace("dispatchEvent: Done processing (id={}): {}", ev.getTransactionId(), ev);
97     }
98
99     public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
100         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
101         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
102         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
103             // The exception should give us a stacktrace
104             LOG.error("Abstract handler reg failed to provide a valid handler type: {} ref: {} handler: {}",
105                     handlerTypeObject, ref.getClass().getName(), handler.getClass().getName(),
106                     new IllegalArgumentException("Missing handler type"));
107             return;
108         }
109         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
110         handlers[handlerType.ordinal()] = handler;
111         LOG.info("eventHandlerAdded: handler: {}, pid: {}, type: {}",
112                 handler.getClass().getName(), pid, handlerType);
113     }
114
115     public void eventHandlerRemoved(final ServiceReference ref){
116         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
117         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
118         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
119             LOG.error("Abstract handler unreg failed to provide a valid handler type {}", handlerTypeObject);
120             return;
121         }
122         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
123         handlers[handlerType.ordinal()] = null;
124         LOG.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
125     }
126
127     /**
128      * Enqueue the event.
129      *
130      * @param event the {@link AbstractEvent} event to be handled.
131      */
132     @Override
133     public void enqueueEvent(AbstractEvent event) {
134         if (event == null) {
135             LOG.warn("enqueueEvent: event is null");
136             return;
137         }
138
139         try {
140             events.put(event);
141         } catch (InterruptedException e) {
142             LOG.error("Thread was interrupted while trying to enqueue event ", e);
143         }
144     }
145
146     @Override
147     public void setDependencies(ServiceReference serviceReference) {}
148
149     @Override
150     public void setDependencies(Object impl) {}
151 }