Clean up logging
[netvirt.git] / openstack / net-virt / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / impl / EventDispatcherImpl.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Dave Tucker, Flavio Fernandes
9  */
10
11 package org.opendaylight.ovsdb.openstack.netvirt.impl;
12
13 import org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent;
14 import org.opendaylight.ovsdb.openstack.netvirt.AbstractHandler;
15 import org.opendaylight.ovsdb.openstack.netvirt.ConfigInterface;
16 import org.opendaylight.ovsdb.openstack.netvirt.api.Constants;
17 import org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher;
18
19 import org.osgi.framework.BundleContext;
20 import org.osgi.framework.ServiceReference;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29
30 public class EventDispatcherImpl implements EventDispatcher, ConfigInterface {
31     private static final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class);
32     private ExecutorService eventHandler;
33     private volatile BlockingQueue<AbstractEvent> events;
34     private AbstractHandler[] handlers;
35
36     public EventDispatcherImpl() {
37         events = new LinkedBlockingQueue<>();
38         handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
39         eventHandler = Executors.newSingleThreadExecutor();
40         start();
41     }
42
43     void start() {
44         eventHandler.submit(new Runnable()  {
45             @Override
46             public void run() {
47                 Thread t = Thread.currentThread();
48                 t.setName("EventDispatcherImpl");
49                 LOG.info("EventDispatcherImpl: started {}", t.getName());
50                 while (true) {
51                     AbstractEvent ev;
52                     try {
53                         ev = events.take();
54                     } catch (InterruptedException e) {
55                         LOG.info("The event handler thread was interrupted, shutting down", e);
56                         return;
57                     }
58                     try {
59                         dispatchEvent(ev);
60                     } catch (Exception e) {
61                         LOG.error("Exception in dispatching event {}", ev.toString(), e);
62                     }
63                 }
64             }
65         });
66         LOG.debug("event dispatcher is started");
67     }
68
69     void stop() {
70         // stop accepting new tasks
71         eventHandler.shutdown();
72         try {
73             // Wait a while for existing tasks to terminate
74             if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
75                 eventHandler.shutdownNow();
76                 // Wait a while for tasks to respond to being cancelled
77                 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
78                     LOG.error("Dispatcher's event handler did not terminate");
79                 }
80             }
81         } catch (InterruptedException e) {
82             // (Re-)Cancel if current thread also interrupted
83             eventHandler.shutdownNow();
84             // Preserve interrupt status
85             Thread.currentThread().interrupt();
86         }
87         LOG.debug("event dispatcher is stopped");
88     }
89
90     private void dispatchEvent(AbstractEvent ev) {
91         AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
92         if (handler == null) {
93             LOG.warn("event dispatcher found no handler for {}", ev);
94             return;
95         }
96
97         handler.processEvent(ev);
98     }
99
100     public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
101         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
102         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
103         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
104             LOG.error("Abstract handler reg failed to provide a valid handler type: {} ref: {} handler: {}",
105                     handlerTypeObject, ref.getClass().getName(), handler.getClass().getName());
106             return;
107         }
108         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
109         handlers[handlerType.ordinal()] = handler;
110         LOG.info("eventHandlerAdded: handler: {}, pid: {}, type: {}",
111                 handler.getClass().getName(), pid, handlerType);
112     }
113
114     public void eventHandlerRemoved(final ServiceReference ref){
115         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
116         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
117         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
118             LOG.error("Abstract handler unreg failed to provide a valid handler type {}", handlerTypeObject);
119             return;
120         }
121         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
122         handlers[handlerType.ordinal()] = null;
123         LOG.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
124     }
125
126     /**
127      * Enqueue the event.
128      *
129      * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
130      */
131     @Override
132     public void enqueueEvent(AbstractEvent event) {
133         if (event == null) {
134             LOG.warn("enqueueEvent: event is null");
135             return;
136         }
137
138         try {
139             events.put(event);
140         } catch (InterruptedException e) {
141             LOG.error("Thread was interrupted while trying to enqueue event ", e);
142         }
143     }
144
145     @Override
146     public void setDependencies(BundleContext bundleContext, ServiceReference serviceReference) {}
147
148     @Override
149     public void setDependencies(Object impl) {}
150 }