Import atomix/{storage,utils}
[controller.git] / third-party / atomix / utils / src / main / java / io / atomix / utils / event / ListenerRegistry.java
1 /*
2  * Copyright 2015-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.utils.event;
17
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.Set;
22 import java.util.concurrent.CopyOnWriteArraySet;
23
24 import static com.google.common.base.Preconditions.checkNotNull;
25
26 /**
27  * Base implementation of an event sink and a registry capable of tracking
28  * listeners and dispatching events to them as part of event sink processing.
29  */
30 public class ListenerRegistry<E extends Event, L extends EventListener<E>>
31     implements ListenerService<E, L>, EventSink<E> {
32
33   private static final long LIMIT = 1_800; // ms
34
35   private final Logger log = LoggerFactory.getLogger(getClass());
36
37   private long lastStart;
38   private L lastListener;
39
40   /**
41    * Set of listeners that have registered.
42    */
43   protected final Set<L> listeners = new CopyOnWriteArraySet<>();
44
45   @Override
46   public void addListener(L listener) {
47     checkNotNull(listener, "Listener cannot be null");
48     listeners.add(listener);
49   }
50
51   @Override
52   public void removeListener(L listener) {
53     checkNotNull(listener, "Listener cannot be null");
54     if (!listeners.remove(listener)) {
55       log.warn("Listener {} not registered", listener);
56     }
57   }
58
59   @Override
60   public void process(E event) {
61     for (L listener : listeners) {
62       try {
63         lastListener = listener;
64         lastStart = System.currentTimeMillis();
65         if (listener.isRelevant(event)) {
66           listener.event(event);
67         }
68         lastStart = 0;
69       } catch (Exception error) {
70         reportProblem(event, error);
71       }
72     }
73   }
74
75   @Override
76   public void onProcessLimit() {
77     if (lastStart > 0) {
78       long duration = System.currentTimeMillis() - lastStart;
79       if (duration > LIMIT) {
80         log.error("Listener {} exceeded execution time limit: {} ms; ejected",
81             lastListener.getClass().getName(),
82             duration);
83         removeListener(lastListener);
84       }
85       lastStart = 0;
86     }
87   }
88
89   /**
90    * Reports a problem encountered while processing an event.
91    *
92    * @param event event being processed
93    * @param error error encountered while processing
94    */
95   protected void reportProblem(E event, Throwable error) {
96     log.warn("Exception encountered while processing event " + event, error);
97   }
98
99 }