2 * Copyright © 2017 Orange, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.transportpce.common.network;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.util.concurrent.FluentFuture;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.HashMap;
17 import java.util.Optional;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantReadWriteLock;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.mdsal.binding.api.DataBroker;
22 import org.opendaylight.mdsal.binding.api.ReadTransaction;
23 import org.opendaylight.mdsal.binding.api.WriteTransaction;
24 import org.opendaylight.mdsal.common.api.CommitInfo;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
27 import org.opendaylight.yangtools.yang.binding.DataObject;
28 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK_EXCEPTION_PATH",
33 justification = "This appears to be doing exactly the right thing with the finally-clause to release the lock")
34 public class RequestProcessor {
36 private static final Logger LOG = LoggerFactory.getLogger(RequestProcessor.class);
38 private final DataBroker dataBroker;
39 private final ReentrantReadWriteLock rwL = new ReentrantReadWriteLock();
40 private final Lock readL = rwL.readLock();
41 private final Lock writeL = rwL.writeLock();
42 private Map<String, WriteTransaction> writeTrMap = new HashMap<>();
45 public RequestProcessor(DataBroker dataBroker) {
46 this.dataBroker = requireNonNull(dataBroker);
47 LOG.info("RequestProcessor instantiated");
50 public <T extends DataObject> ListenableFuture<Optional<T>> read(LogicalDatastoreType store,
51 InstanceIdentifier<T> path) {
52 ReadTransaction readTx = dataBroker.newReadOnlyTransaction();
53 String thread = Thread.currentThread().getName();
55 LOG.debug("read locked {} by {}", store, thread);
57 return readTx.read(store, path);
62 LOG.debug("read after unlock - {}", thread);
66 public <T extends DataObject> void delete(LogicalDatastoreType store, InstanceIdentifier<?> path) {
67 String thread = Thread.currentThread().getName();
68 LOG.debug("delete - store, thread = {} - {}", store, thread);
70 LOG.debug("delete locked by {}", thread);
72 if (!writeTrMap.containsKey(thread)) {
73 writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
75 writeTrMap.get(thread).delete(store, path);
78 LOG.debug("delete before unlock - {}", thread);
80 LOG.debug("delete after unlock1 - {}", Thread.currentThread().getName());
81 LOG.debug("delete after unlock2 - {}", thread);
85 public <T extends DataObject> void put(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
86 String thread = Thread.currentThread().getName();
88 LOG.debug("put locked {} by {}", store, thread);
90 if (!writeTrMap.containsKey(thread)) {
91 writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
93 writeTrMap.get(thread).put(store, path, data);
97 LOG.debug("put after unlock - {}", thread);
101 public <T extends DataObject> void merge(LogicalDatastoreType store, InstanceIdentifier<T> path, T data) {
102 String thread = Thread.currentThread().getName();
104 LOG.debug("merge locked {} by {}", store, thread);
106 if (!writeTrMap.containsKey(thread)) {
107 writeTrMap.put(thread, dataBroker.newWriteOnlyTransaction());
109 writeTrMap.get(thread).merge(store, path, data);
113 LOG.debug("merge after unlock - {}", thread);
117 public FluentFuture<? extends @NonNull CommitInfo> commit() {
118 String thread = Thread.currentThread().getName();
120 LOG.debug("commit locked by {}", thread);
122 if (writeTrMap.containsKey(thread)) {
123 return writeTrMap.get(thread).commit();
125 LOG.warn("No write transaction available for thread {}", thread);
126 return FluentFutures.immediateNullFluentFuture();
130 writeTrMap.remove(thread);
132 LOG.debug("commit after unlock - {}", thread);
137 * Return the dataBroker related to RequestProcessor.
138 * @return the dataBroker
140 public DataBroker getDataBroker() {