807589fd0c9a27a894478e2502e78ab3b5a31c26
[transportpce.git] / common / src / main / java / org / opendaylight / transportpce / common / network / RequestProcessor.java
1 /*
2  * Copyright © 2017 Orange, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.transportpce.common.network;
9
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import java.util.concurrent.locks.ReentrantReadWriteLock;
13 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
14 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
15 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.yangtools.yang.binding.DataObject;
18 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 public class RequestProcessor {
23
24     private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
25
26     private final DataBroker dataBroker;
27     private ReadWriteTransaction rwTx;
28     private ReadOnlyTransaction readTx;
29     private ReentrantReadWriteLock lock;
30
31
32
33     public RequestProcessor(DataBroker dataBroker) {
34         this.dataBroker = dataBroker;
35         rwTx = dataBroker.newReadWriteTransaction();
36         readTx = dataBroker.newReadOnlyTransaction();
37         lock = new ReentrantReadWriteLock();
38         LOG.info("RequestProcessor instantiated");
39
40     }
41
42     public <T extends DataObject> ListenableFuture<Optional<T>>
43          read(LogicalDatastoreType store,InstanceIdentifier<T> path) {
44
45         ListenableFuture<Optional<T>> result = null;
46         acquireReadLock();
47         LOG.debug("Number of threads in queue to read {}", lock.getQueueLength());
48         result = rwTx.read(store, path);
49
50         releaseReadLock();
51         return result;
52     }
53
54     public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
55
56         acquireLock();
57         LOG.info("Number of delete requests waiting in queue :{}", lock.getQueueLength());
58         rwTx.delete(store, path);
59     }
60
61     public <T extends DataObject> void put(LogicalDatastoreType store,
62         InstanceIdentifier<T> path, T data, boolean createMissingParents) {
63
64         acquireLock();
65         LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
66         rwTx.put(store, path, data, createMissingParents);
67     }
68
69     public <T extends DataObject> void put(LogicalDatastoreType store,
70         InstanceIdentifier<T> path, T data) {
71
72         acquireLock();
73         LOG.debug("Number of put requests waiting in queue :{}", lock.getQueueLength());
74         rwTx.put(store, path, data);
75     }
76
77
78     public <T extends DataObject> void merge(LogicalDatastoreType store,
79         InstanceIdentifier<T> path, T data, boolean createMissingParents) {
80
81         acquireLock();
82         LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
83         rwTx.merge(store, path, data, createMissingParents);
84     }
85
86     public <T extends DataObject> void merge(LogicalDatastoreType store,
87         InstanceIdentifier<T> path, T data) {
88
89         acquireLock();
90         LOG.debug("Number of merge requests waiting in queue :{}", lock.getQueueLength());
91         rwTx.merge(store, path, data);
92     }
93
94     @Deprecated
95     public ListenableFuture<Void> submit() {
96         acquireLock();
97         ListenableFuture<Void> future = null;
98         future = rwTx.submit();
99         releaseLock();
100         resetRwTx();
101         return future;
102     }
103
104     public void close() {
105         releaseLock();
106     }
107
108     private void acquireLock() {
109         if (!lock.writeLock().isHeldByCurrentThread()) {
110             lock.writeLock().lock();
111             LOG.debug("Number of write lock requests waiting in queue :{}", lock.getQueueLength());
112             LOG.info("Write Lock acquired by : {}", Thread.currentThread().getName());
113             rwTx = resetRwTx();
114         } else {
115             LOG.debug("Lock already acquired by : {}", Thread.currentThread().getName());
116         }
117     }
118
119     private void acquireReadLock() {
120         if (lock.getReadHoldCount() > 0) {
121             LOG.info("Read Lock already acquired by : {}", Thread.currentThread().getName());
122         } else {
123             lock.readLock().lock();
124             rwTx = resetRwTx();
125             LOG.info("Read Lock acquired by : {}", Thread.currentThread().getName());
126         }
127     }
128
129     private void releaseLock() {
130         if (lock.writeLock().isHeldByCurrentThread()) {
131             LOG.info("Write Lock released by : {}", Thread.currentThread().getName());
132             lock.writeLock().unlock();
133         }
134     }
135
136     private void releaseReadLock() {
137         LOG.info("Read Lock released by : {}", Thread.currentThread().getName());
138         lock.readLock().unlock();
139     }
140
141     private ReadWriteTransaction resetRwTx() {
142         LOG.info("Resetting the read write transaction .....");
143         rwTx = dataBroker.newReadWriteTransaction();
144         return rwTx;
145     }
146 }