Merge "Bug 1885: VM ingress rule is not installed for vlan networking"
[ovsdb.git] / openstack / net-virt / src / main / java / org / opendaylight / ovsdb / openstack / netvirt / impl / EventDispatcherImpl.java
1 /*
2  * Copyright (C) 2014 Red Hat, Inc.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  *
8  * Authors : Dave Tucker, Flavio Fernandes
9  */
10
11 package org.opendaylight.ovsdb.openstack.netvirt.impl;
12
13 import org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent;
14 import org.opendaylight.ovsdb.openstack.netvirt.AbstractHandler;
15 import org.opendaylight.ovsdb.openstack.netvirt.api.Constants;
16 import org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher;
17
18 import org.osgi.framework.ServiceReference;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
27
28 public class EventDispatcherImpl implements EventDispatcher {
29
30     static final Logger logger = LoggerFactory.getLogger(EventDispatcher.class);
31     private ExecutorService eventHandler;
32     private volatile BlockingQueue<AbstractEvent> events;
33
34     private AbstractHandler[] handlers;
35
36     void init() {
37         eventHandler = Executors.newSingleThreadExecutor();
38         this.events = new LinkedBlockingQueue<>();
39         this.handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
40     }
41
42     void start() {
43         eventHandler.submit(new Runnable()  {
44             @Override
45             public void run() {
46                 while (true) {
47                     AbstractEvent ev;
48                     try {
49                         ev = events.take();
50                     } catch (InterruptedException e) {
51                         logger.info("The event handler thread was interrupted, shutting down", e);
52                         return;
53                     }
54                     try {
55                         dispatchEvent(ev);
56                     } catch (Exception e) {
57                         logger.error("Exception in dispatching event "+ev.toString(), e);
58                     }
59                 }
60             }
61         });
62         logger.debug("event dispatcher is started");
63     }
64
65     void stop() {
66         // stop accepting new tasks
67         eventHandler.shutdown();
68         try {
69             // Wait a while for existing tasks to terminate
70             if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
71                 eventHandler.shutdownNow();
72                 // Wait a while for tasks to respond to being cancelled
73                 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS))
74                     logger.error("Dispatcher's event handler did not terminate");
75             }
76         } catch (InterruptedException e) {
77             // (Re-)Cancel if current thread also interrupted
78             eventHandler.shutdownNow();
79             // Preserve interrupt status
80             Thread.currentThread().interrupt();
81         }
82         logger.debug("event dispatcher is stopped");
83     }
84
85     private void dispatchEvent(AbstractEvent ev) {
86         AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
87         if (handler == null) {
88             logger.warn("event dispatcher found no handler for " + ev);
89             return;
90         }
91
92         handler.processEvent(ev);
93     }
94
95     public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
96         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
97         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
98         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
99             logger.error("Abstract handler reg failed to provide a valid handler type " + handlerTypeObject);
100             return;
101         }
102         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
103         handlers[handlerType.ordinal()] = handler;
104
105         logger.debug("Event handler for type {} registered for {}, pid {}",
106                      handlerType, handler.getClass().getName(), pid);
107     }
108
109     public void eventHandlerRemoved(final ServiceReference ref){
110         Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
111         Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
112         if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
113             logger.error("Abstract handler unreg failed to provide a valid handler type " + handlerTypeObject);
114             return;
115         }
116         AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
117         handlers[handlerType.ordinal()] = null;
118
119         logger.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
120     }
121
122     /**
123      * Enqueue the event.
124      *
125      * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
126      */
127     @Override
128     public void enqueueEvent(AbstractEvent event) {
129         if (event == null) {
130             return;
131         }
132
133         try {
134             events.put(event);
135         } catch (InterruptedException e) {
136             logger.error("Thread was interrupted while trying to enqueue event ", e);
137         }
138     }
139 }