1e853e0877fc313674605c33c89f05c498a9699c
[genius.git] / mdsalutil / mdsalutil-api / src / main / java / org / opendaylight / genius / datastoreutils / AsyncDataChangeListenerBase.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.genius.datastoreutils;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import java.util.Collections;
13 import java.util.Map;
14 import java.util.Set;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ThreadPoolExecutor;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.PreDestroy;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.yangtools.concepts.ListenerRegistration;
26 import org.opendaylight.yangtools.yang.binding.DataObject;
27 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 public abstract class AsyncDataChangeListenerBase<T extends DataObject, K extends DataChangeListener> implements DataChangeListener, AutoCloseable {
32     private static final Logger LOG = LoggerFactory.getLogger(AsyncDataChangeListenerBase.class);
33
34     private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE = 1;
35     private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE = 1;
36     private static final int DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS = 300;
37     private static final int STARTUP_LOOP_TICK = 500;
38     private static final int STARTUP_LOOP_MAX_RETRIES = 8;
39
40     private static ThreadPoolExecutor dataChangeHandlerExecutor = new ThreadPoolExecutor(
41             DATATREE_CHANGE_HANDLER_THREAD_POOL_CORE_SIZE,
42             DATATREE_CHANGE_HANDLER_THREAD_POOL_MAX_SIZE,
43             DATATREE_CHANGE_HANDLER_THREAD_POOL_KEEP_ALIVE_TIME_SECS,
44             TimeUnit.SECONDS,
45             new LinkedBlockingQueue<>());
46
47     private ListenerRegistration<K> listenerRegistration;
48     protected final Class<T> clazz;
49     private final Class<K> eventClazz;
50
51     /**
52      * @param clazz - for which the data change event is received
53      */
54     public AsyncDataChangeListenerBase(Class<T> clazz, Class<K> eventClazz) {
55         this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!");
56         this.eventClazz = Preconditions.checkNotNull(eventClazz, "eventClazz can not be null!");
57     }
58
59     public void registerListener(final LogicalDatastoreType dsType, final DataBroker db) {
60         try {
61             TaskRetryLooper looper = new TaskRetryLooper(STARTUP_LOOP_TICK, STARTUP_LOOP_MAX_RETRIES);
62             listenerRegistration = looper.loopUntilNoException(
63                     (Callable<ListenerRegistration<K>>) () -> (ListenerRegistration) db.registerDataChangeListener(
64                             dsType, getWildCardPath(), getDataChangeListener(), getDataChangeScope()));
65         } catch (final Exception e) {
66             LOG.warn("{}: Data Tree Change listener registration failed.", eventClazz.getName());
67             LOG.debug("{}: Data Tree Change listener registration failed: {}", eventClazz.getName(), e);
68             throw new IllegalStateException( eventClazz.getName() + "{}startup failed. System needs restart.", e);
69         }
70     }
71
72     @Override
73     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
74         if (changeEvent == null) {
75             return;
76         }
77
78         DataChangeHandler dataChangeHandler = new DataChangeHandler(changeEvent);
79         dataChangeHandlerExecutor.execute(dataChangeHandler);
80     }
81
82     @SuppressWarnings("unchecked")
83     private void createData(final Map<InstanceIdentifier<?>, DataObject> createdData) {
84         final Set<InstanceIdentifier<?>> keys = createdData.keySet() != null
85                 ? createdData.keySet() : Collections.emptySet();
86         for (InstanceIdentifier<?> key : keys) {
87             if (clazz.equals(key.getTargetType())) {
88                 InstanceIdentifier<T> createKeyIdent = key.firstIdentifierOf(clazz);
89                 final Optional<DataObject> value = Optional.of(createdData.get(key));
90                 if (value.isPresent()) {
91                     this.add(createKeyIdent, (T)value.get());
92                 }
93             }
94         }
95     }
96
97     @SuppressWarnings("unchecked")
98     private void updateData(final Map<InstanceIdentifier<?>, DataObject> updateData,
99                             final Map<InstanceIdentifier<?>, DataObject> originalData) {
100
101         final Set<InstanceIdentifier<?>> keys = updateData.keySet() != null
102                 ? updateData.keySet() : Collections.emptySet();
103         for (InstanceIdentifier<?> key : keys) {
104             if (clazz.equals(key.getTargetType())) {
105                 InstanceIdentifier<T> updateKeyIdent = key.firstIdentifierOf(clazz);
106                 final Optional<DataObject> value = Optional.of(updateData.get(key));
107                 final Optional<DataObject> original = Optional.of(originalData.get(key));
108                 if (value.isPresent() && original.isPresent()) {
109                     this.update(updateKeyIdent, (T) original.get(), (T) value.get());
110                 }
111             }
112         }
113     }
114
115     @SuppressWarnings("unchecked")
116     private void removeData(final Set<InstanceIdentifier<?>> removeData,
117                             final Map<InstanceIdentifier<?>, DataObject> originalData) {
118
119         for (InstanceIdentifier<?> key : removeData) {
120             if (clazz.equals(key.getTargetType())) {
121                 final InstanceIdentifier<T> ident = key.firstIdentifierOf(clazz);
122                 final DataObject removeValue = originalData.get(key);
123                 this.remove(ident, (T)removeValue);
124             }
125         }
126     }
127
128     @Override
129     @PreDestroy
130     public void close() throws Exception {
131         if (listenerRegistration != null) {
132             try {
133                 listenerRegistration.close();
134             } catch (final Exception e) {
135                 LOG.error("Error when cleaning up DataChangeListener.", e);
136             }
137             listenerRegistration = null;
138         }
139         LOG.info("Interface Manager Closed");
140     }
141
142     protected abstract void remove(InstanceIdentifier<T> identifier, T del);
143
144     protected abstract void update(InstanceIdentifier<T> identifier, T original, T update);
145
146     protected abstract void add(InstanceIdentifier<T> identifier, T add);
147
148     protected abstract InstanceIdentifier<T> getWildCardPath();
149
150     protected abstract DataChangeListener getDataChangeListener();
151
152     protected abstract AsyncDataBroker.DataChangeScope getDataChangeScope();
153
154     public class DataChangeHandler implements Runnable {
155         final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent;
156
157         public DataChangeHandler(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changeEvent) {
158             this.changeEvent = changeEvent;
159         }
160
161         @Override
162         public void run() {
163             Preconditions.checkNotNull(changeEvent,"Async ChangeEvent can not be null!");
164
165             /* All DataObjects for create */
166             final Map<InstanceIdentifier<?>, DataObject> createdData = changeEvent.getCreatedData() != null
167                     ? changeEvent.getCreatedData() : Collections.emptyMap();
168             /* All DataObjects for remove */
169             final Set<InstanceIdentifier<?>> removeData = changeEvent.getRemovedPaths() != null
170                     ? changeEvent.getRemovedPaths() : Collections.emptySet();
171             /* All DataObjects for updates */
172             final Map<InstanceIdentifier<?>, DataObject> updateData = changeEvent.getUpdatedData() != null
173                     ? changeEvent.getUpdatedData() : Collections.emptyMap();
174             /* All Original DataObjects */
175             final Map<InstanceIdentifier<?>, DataObject> originalData = changeEvent.getOriginalData() != null
176                     ? changeEvent.getOriginalData() : Collections.emptyMap();
177
178             createData(createdData);
179             updateData(updateData, originalData);
180             removeData(removeData, originalData);
181         }
182     }
183 }