2 * Copyright (c) 2013, 2015 Red Hat, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netvirt.openstack.netvirt.impl;
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import org.opendaylight.netvirt.openstack.netvirt.AbstractEvent;
13 import org.opendaylight.netvirt.openstack.netvirt.AbstractHandler;
14 import org.opendaylight.netvirt.openstack.netvirt.ConfigInterface;
15 import org.opendaylight.netvirt.openstack.netvirt.api.Constants;
16 import org.opendaylight.netvirt.openstack.netvirt.api.EventDispatcher;
18 import org.osgi.framework.ServiceReference;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.ThreadFactory;
29 public class EventDispatcherImpl implements EventDispatcher, ConfigInterface {
30 private static final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class);
31 private ExecutorService eventHandler;
32 private volatile BlockingQueue<AbstractEvent> events;
33 private AbstractHandler[] handlers;
35 public EventDispatcherImpl() {
36 events = new LinkedBlockingQueue<>();
37 handlers = new AbstractHandler[AbstractEvent.HandlerType.size];
38 ThreadFactory threadFactory = new ThreadFactoryBuilder()
39 .setNameFormat("NV-EvtDsptchr-%d").build();
40 eventHandler = Executors.newSingleThreadExecutor(threadFactory);
45 eventHandler.submit(() -> {
46 Thread t = Thread.currentThread();
47 t.setName("EventDispatcherImpl");
48 LOG.info("EventDispatcherImpl: started {}", t.getName());
53 } catch (InterruptedException e) {
54 LOG.info("The event handler thread was interrupted, shutting down", e);
59 } catch (Exception e) {
60 LOG.error("Exception in dispatching event {}", ev.toString(), e);
64 LOG.debug("event dispatcher is started");
68 // stop accepting new tasks
69 eventHandler.shutdown();
71 // Wait a while for existing tasks to terminate
72 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
73 eventHandler.shutdownNow();
74 // Wait a while for tasks to respond to being cancelled
75 if (!eventHandler.awaitTermination(10, TimeUnit.SECONDS)) {
76 LOG.error("Dispatcher's event handler did not terminate");
79 } catch (InterruptedException e) {
80 // (Re-)Cancel if current thread also interrupted
81 eventHandler.shutdownNow();
82 // Preserve interrupt status
83 Thread.currentThread().interrupt();
85 LOG.debug("event dispatcher is stopped");
88 private void dispatchEvent(AbstractEvent ev) {
89 LOG.trace("dispatchEvent: Processing (id={}): Event : {}", ev.getTransactionId(), ev);
90 AbstractHandler handler = handlers[ev.getHandlerType().ordinal()];
91 if (handler == null) {
92 LOG.warn("event dispatcher found no handler for : {}", ev);
96 handler.processEvent(ev);
97 LOG.trace("dispatchEvent: Done processing (id={}): Event : {}", ev.getTransactionId(), ev);
100 public void eventHandlerAdded(final ServiceReference ref, AbstractHandler handler){
101 Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
102 Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
103 if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
104 // The exception should give us a stacktrace
105 LOG.error("Abstract handler reg failed to provide a valid handler type: {} ref: {} handler: {}",
106 handlerTypeObject, ref.getClass().getName(), handler.getClass().getName(),
107 new IllegalArgumentException("Missing handler type"));
110 AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
111 handlers[handlerType.ordinal()] = handler;
112 LOG.debug("eventHandlerAdded: handler: {}, pid: {}, type: {}",
113 handler.getClass().getName(), pid, handlerType);
116 public void eventHandlerRemoved(final ServiceReference ref){
117 Long pid = (Long) ref.getProperty(org.osgi.framework.Constants.SERVICE_ID);
118 Object handlerTypeObject = ref.getProperty(Constants.EVENT_HANDLER_TYPE_PROPERTY);
119 if (!(handlerTypeObject instanceof AbstractEvent.HandlerType)){
120 LOG.error("Abstract handler unreg failed to provide a valid handler type {}", handlerTypeObject);
123 AbstractEvent.HandlerType handlerType = (AbstractEvent.HandlerType) handlerTypeObject;
124 handlers[handlerType.ordinal()] = null;
125 LOG.debug("Event handler for type {} unregistered pid {}", handlerType, pid);
131 * @param event the {@link AbstractEvent} event to be handled.
134 public void enqueueEvent(AbstractEvent event) {
136 LOG.warn("enqueueEvent: event is null");
142 } catch (InterruptedException e) {
143 LOG.error("Thread was interrupted while trying to enqueue event ", e);
148 public void setDependencies(ServiceReference serviceReference) {}
151 public void setDependencies(Object impl) {}