2 * Copyright (C) 2014 Red Hat, Inc.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Dave Tucker, Flavio Fernandes
11 package org.opendaylight.ovsdb.openstack.netvirt.impl;
13 import org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent;
14 import org.opendaylight.ovsdb.openstack.netvirt.AbstractHandler;
15 import org.opendaylight.ovsdb.openstack.netvirt.api.Constants;
16 import org.opendaylight.ovsdb.openstack.netvirt.api.EventDispatcher;
18 import org.osgi.framework.ServiceReference;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
28 public class EventDispatcherImpl implements EventDispatcher {
30 static final Logger logger = LoggerFactory.getLogger(EventDispatcher.class);
31 private ExecutorService eventHandler;
32 private BlockingQueue<AbstractEvent> events;
34 private AbstractHandler[] handlers;
37 eventHandler = Executors.newSingleThreadExecutor();
38 this.events = new LinkedBlockingQueue<>();
39 this.handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
43 eventHandler.submit(new Runnable() {
50 } catch (InterruptedException e) {
51 logger.info("The event handler thread was interrupted, shutting down", e);
58 logger.debug("event dispatcher is started");
62 // stop accepting new tasks
63 eventHandler.shutdown();
65 // Wait a while for existing tasks to terminate
66 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
67 eventHandler.shutdownNow();
68 // Wait a while for tasks to respond to being cancelled
69 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS))
70 logger.error("Dispatcher's event handler did not terminate");
72 } catch (InterruptedException e) {
73 // (Re-)Cancel if current thread also interrupted
74 eventHandler.shutdownNow();
75 // Preserve interrupt status
76 Thread.currentThread().interrupt();
78 logger.debug("event dispatcher is stopped");
81 private void dispatchEvent(AbstractEvent ev) {
82 AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
83 if (handler == null) {
84 logger.warn("event dispatcher found no handler for " + ev);
88 handler.processEvent(ev);
91 public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
92 Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
93 Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
94 if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
95 logger.error("Abstract handler reg failed to provide a valid handler type " + handlerTypeObject);
98 AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
99 handlers[handlerType.ordinal()] = handler;
101 logger.debug("Event handler for type {} registered for {}, pid {}",
102 handlerType, handler.getClass().getName(), pid);
105 public void eventHandlerRemoved(final ServiceReference ref){
106 Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
107 Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
108 if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
109 logger.error("Abstract handler unreg failed to provide a valid handler type " + handlerTypeObject);
112 AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
113 handlers[handlerType.ordinal()] = null;
115 logger.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
121 * @param event the {@link org.opendaylight.ovsdb.openstack.netvirt.AbstractEvent} event to be handled.
124 public void enqueueEvent(AbstractEvent event) {
131 } catch (InterruptedException e) {
132 logger.error("Thread was interrupted while trying to enqueue event ", e);