NETVIRT-1630 migrate to md-sal APIs
[netvirt.git] / elanmanager / impl / src / main / java / org / opendaylight / netvirt / elan / internal / ElanGroupCache.java
1 /*
2  * Copyright (c) 2019 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netvirt.elan.internal;
10
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Map;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import javax.annotation.PreDestroy;
19 import javax.inject.Inject;
20 import javax.inject.Singleton;
21 import org.opendaylight.infrautils.utils.concurrent.Executors;
22 import org.opendaylight.mdsal.binding.api.DataBroker;
23 import org.opendaylight.mdsal.binding.api.ReadTransaction;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.netvirt.elan.utils.Scheduler;
26 import org.opendaylight.serviceutils.tools.listener.AbstractClusteredAsyncDataTreeChangeListener;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 @Singleton
36 public class ElanGroupCache extends AbstractClusteredAsyncDataTreeChangeListener<Group> {
37     private static final Logger LOG = LoggerFactory.getLogger(ElanGroupCache.class);
38     private final DataBroker dataBroker;
39     private final Scheduler scheduler;
40     private final Map<InstanceIdentifier<Group>, Group> groupsById = new ConcurrentHashMap<>();
41     private final Map<InstanceIdentifier<Group>, Collection<Runnable>> waitingJobs = new ConcurrentHashMap<>();
42
43     @Inject
44     public ElanGroupCache(final DataBroker dataBroker, final Scheduler scheduler) {
45         super(dataBroker, LogicalDatastoreType.CONFIGURATION, InstanceIdentifier.create(Nodes.class)
46                 .child(Node.class).augmentation(FlowCapableNode.class).child(Group.class),
47                 Executors.newListeningSingleThreadExecutor("ElanGroupCache", LOG));
48         this.dataBroker = dataBroker;
49         this.scheduler = scheduler;
50     }
51
52     public synchronized void init() {
53         LOG.info("{} registered", getClass().getSimpleName());
54     }
55
56     public synchronized void addJobToWaitList(InstanceIdentifier<Group> key,
57                                               Runnable job) {
58         if (groupsById.containsKey(key)) {
59             job.run();
60         } else {
61             waitingJobs.putIfAbsent(key, new ArrayList<>());
62             waitingJobs.get(key).add(job);
63         }
64     }
65
66     @Override
67     public synchronized void remove(InstanceIdentifier<Group> key, Group deleted) {
68         groupsById.remove(key);
69     }
70
71     @Override
72     public void update(InstanceIdentifier<Group> key, Group old, Group updated) {
73         add(key, updated);
74     }
75
76     @Override
77     public synchronized void add(InstanceIdentifier<Group> key, Group added) {
78         if (groupsById.containsKey(key)) {
79             groupsById.put(key, added);
80             return;
81         }
82         scheduler.getScheduledExecutorService().schedule(() -> {
83             groupsById.put(key, added);
84             Collection<Runnable> jobs = waitingJobs.remove(key);
85             if (jobs == null) {
86                 return;
87             }
88             for (Runnable job : jobs) {
89                 job.run();
90             }
91         }, ElanInterfaceManager.WAIT_TIME_FOR_SYNC_INSTALL, TimeUnit.MILLISECONDS);
92     }
93
94     public Optional<Group> getGroup(InstanceIdentifier<Group> key) throws InterruptedException, ExecutionException {
95         if (groupsById.containsKey(key)) {
96             return Optional.of(groupsById.get(key));
97         }
98         ReadTransaction transaction = dataBroker.newReadOnlyTransaction();
99         Optional<Group> optional = transaction.read(LogicalDatastoreType.CONFIGURATION, key).get();
100         transaction.close();
101         return optional;
102     }
103
104     @Override
105     @PreDestroy
106     public void close() {
107         super.close();
108         Executors.shutdownAndAwaitTermination(getExecutorService());
109     }
110
111 }