Merge "Updated BgpManager for Be"
[vpnservice.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / vpnservice / datastoreutils / AsyncClusteredDataChangeListenerBase.java
1 /*
2  * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.vpnservice.datastoreutils;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import org.opendaylight.controller.md.sal.binding.api.*;
13 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
14 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
15 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
16 import org.opendaylight.yangtools.concepts.ListenerRegistration;
17 import org.opendaylight.yangtools.yang.binding.DataObject;
18 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import java.util.Collections;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30 public abstract class AsyncClusteredDataChangeListenerBase<T extends DataObject, K extends ClusteredDataChangeListener> implements ClusteredDataChangeListener, AutoCloseable {
31     private static final Logger LOG = LoggerFactory.getLogger(AsyncClusteredDataChangeListenerBase.class);
32
33     private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1;
34     private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1;
35     private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
36     private static final int STARTUP_LOOP_TICK = 500;
37     private static final int STARTUP_LOOP_MAX_RETRIES = 8;
38
39     private static ThreadPoolExecutor dataChangeHandlerExecutor = new ThreadPoolExecutor(
40             DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE,
41             DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE,
42             DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS,
43             TimeUnit.SECONDS,
44             new LinkedBlockingQueue<Runnable>());
45
46     private ListenerRegistration<K> listenerRegistration;
47     protected final Class<T> clazz;
48     private final Class<K> eventClazz;
49
50     /**
51      * @param clazz - for which the data change event is received
52      */
53     public AsyncClusteredDataChangeListenerBase(Class<T> clazz, Class<K> eventClazz) {
54         this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!");
55         this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!");
56     }
57
58     public void registerListener(final LogicalDatastoreType dsType, final DataBroker db) {
59         try {
60             TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
61             listenerRegistration = looper.loopUntilNoException(new Callable<ListenerRegistration<K>>() {
62                 @Override
63                 public ListenerRegistration call() throws Exception {
64                     return db.registerDataChangeListener(dsType, getWildCardPath(), getDataChangeListener(), getDataChangeScope());
65                 }
66             });
67         } catch (final Exception e) {
68             LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName());
69             LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e);
70             throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e);
71         }
72     }
73
74     @Override
75     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
76         if (changeEvent == null) {
77             return;
78         }
79
80         DataChangeHandler dataChangeHandler = new DataChangeHandler(changeEvent);
81         dataChangeHandlerExecutor.execute(dataChangeHandler);
82     }
83
84     @SuppressWarnings("unchecked")
85     private void createData(final Map<InstanceIdentifier<?>, DataObject> createdData) {
86         final Set<InstanceIdentifier<?>> keys = createdData.keySet() != null
87                 ? createdData.keySet() : Collections.<InstanceIdentifier<?>>emptySet();
88         for (InstanceIdentifier<?> key : keys) {
89             if (clazz.equals(key.getTargetType())) {
90                 InstanceIdentifier<T> createKeyIdent = key.firstIdentifierOf(clazz);
91                 final Optional<DataObject> value = Optional.of(createdData.get(key));
92                 if (value.isPresent()) {
93                     this.add(createKeyIdent, (T)value.get());
94                 }
95             }
96         }
97     }
98
99     @SuppressWarnings("unchecked")
100     private void updateData(final Map<InstanceIdentifier<?>, DataObject> updateData,
101                             final Map<InstanceIdentifier<?>, DataObject> originalData) {
102
103         final Set<InstanceIdentifier<?>> keys = updateData.keySet() != null
104                 ? updateData.keySet() : Collections.<InstanceIdentifier<?>>emptySet();
105         for (InstanceIdentifier<?> key : keys) {
106             if (clazz.equals(key.getTargetType())) {
107                 InstanceIdentifier<T> updateKeyIdent = key.firstIdentifierOf(clazz);
108                 final Optional<DataObject> value = Optional.of(updateData.get(key));
109                 final Optional<DataObject> original = Optional.of(originalData.get(key));
110                 if (value.isPresent() && original.isPresent()) {
111                     this.update(updateKeyIdent, (T) original.get(), (T) value.get());
112                 }
113             }
114         }
115     }
116
117     @SuppressWarnings("unchecked")
118     private void removeData(final Set<InstanceIdentifier<?>> removeData,
119                             final Map<InstanceIdentifier<?>, DataObject> originalData) {
120
121         for (InstanceIdentifier<?> key : removeData) {
122             if (clazz.equals(key.getTargetType())) {
123                 final InstanceIdentifier<T> ident = key.firstIdentifierOf(clazz);
124                 final DataObject removeValue = originalData.get(key);
125                 this.remove(ident, (T)removeValue);
126             }
127         }
128     }
129
130     @Override
131     public void close() throws Exception {
132         if (listenerRegistration != null) {
133             try {
134                 listenerRegistration.close();
135             } catch (final Exception e) {
136                 LOG.error("Error when cleaning up ClusteredDataChangeListener.", e);
137             }
138             listenerRegistration = null;
139         }
140         LOG.info("Interface Manager Closed");
141     }
142
143     protected abstract void remove(InstanceIdentifier<T> identifier, T del);
144
145     protected abstract void update(InstanceIdentifier<T> identifier, T original, T update);
146
147     protected abstract void add(InstanceIdentifier<T> identifier, T add);
148
149     protected abstract InstanceIdentifier<T> getWildCardPath();
150
151     protected abstract ClusteredDataChangeListener getDataChangeListener();
152
153     protected abstract AsyncDataBroker.DataChangeScope getDataChangeScope();
154
155     public class DataChangeHandler implements Runnable {
156         final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent;
157
158         public DataChangeHandler(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
159             this.changeEvent = changeEvent;
160         }
161
162         @Override
163         public void run() {
164             Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!");
165
166             /* All DataObjects for create */
167             final Map<InstanceIdentifier<?>, DataObject> createdData = changeEvent.getCreatedData() != null
168                     ? changeEvent.getCreatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
169             /* All DataObjects for remove */
170             final Set<InstanceIdentifier<?>> removeData = changeEvent.getRemovedPaths() != null
171                     ? changeEvent.getRemovedPaths() : Collections.<InstanceIdentifier<?>>emptySet();
172             /* All DataObjects for updates */
173             final Map<InstanceIdentifier<?>, DataObject> updateData = changeEvent.getUpdatedData() != null
174                     ? changeEvent.getUpdatedData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
175             /* All Original DataObjects */
176             final Map<InstanceIdentifier<?>, DataObject> originalData = changeEvent.getOriginalData() != null
177                     ? changeEvent.getOriginalData() : Collections.<InstanceIdentifier<?>, DataObject>emptyMap();
178
179             createData(createdData);
180             updateData(updateData, originalData);
181             removeData(removeData, originalData);
182         }
183     }
184 }