Simplify code using Java 8 features
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ProxyReadWriteTransaction.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.tx;
10
11 import akka.actor.ActorRef;
12 import akka.dispatch.OnComplete;
13 import akka.util.Timeout;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Preconditions;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FluentFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
32 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import scala.concurrent.ExecutionContext;
38 import scala.concurrent.Future;
39
40 /**
41  * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
42  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
43  */
44 public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
45     private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
46
47     private final RemoteDeviceId id;
48     private final AtomicBoolean opened = new AtomicBoolean(true);
49
50     @GuardedBy("queuedTxOperations")
51     private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
52
53     private volatile ProxyTransactionFacade transactionFacade;
54
55     public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
56             final ExecutionContext executionContext, final Timeout askTimeout) {
57         this.id = id;
58
59         masterTxActorFuture.onComplete(new OnComplete<Object>() {
60             @Override
61             public void onComplete(final Throwable failure, final Object masterTxActor) {
62                 final ProxyTransactionFacade newTransactionFacade;
63                 if (failure != null) {
64                     LOG.debug("{}: Failed to obtain master actor", id, failure);
65                     newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
66                 } else {
67                     LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
68                     newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
69                             executionContext, askTimeout);
70                 }
71
72                 executePriorTransactionOperations(newTransactionFacade);
73             }
74         }, executionContext);
75     }
76
77     @Override
78     public boolean cancel() {
79         if (!opened.compareAndSet(true, false)) {
80             return false;
81         }
82
83         processTransactionOperation(AsyncWriteTransaction::cancel);
84         return true;
85     }
86
87     @Override
88     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
89             final YangInstanceIdentifier path) {
90         LOG.debug("{}: Read {} {}", id, store, path);
91
92         final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
93         processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
94         return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
95     }
96
97     @Override
98     public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
99             final YangInstanceIdentifier path) {
100         LOG.debug("{}: Exists {} {}", id, store, path);
101
102         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
103         processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
104         return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
105     }
106
107     @Override
108     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
109         checkOpen();
110         LOG.debug("{}: Delete {} {}", id, store, path);
111         processTransactionOperation(facade -> facade.delete(store, path));
112     }
113
114     @Override
115     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
116                     final NormalizedNode<?, ?> data) {
117         checkOpen();
118         LOG.debug("{}: Put {} {}", id, store, path);
119         processTransactionOperation(facade -> facade.put(store, path, data));
120     }
121
122     @Override
123     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
124                       final NormalizedNode<?, ?> data) {
125         checkOpen();
126         LOG.debug("{}: Merge {} {}", id, store, path);
127         processTransactionOperation(facade -> facade.merge(store, path, data));
128     }
129
130     @Override
131     public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
132         Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
133         LOG.debug("{}: Commit", id);
134
135         final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
136         processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
137         return returnFuture;
138     }
139
140     @Override
141     public Object getIdentifier() {
142         return id;
143     }
144
145     private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
146         final ProxyTransactionFacade facadeOnEntry;
147         synchronized (queuedTxOperations) {
148             if (transactionFacade == null) {
149                 LOG.debug("{}: Queuing transaction operation", id);
150
151                 queuedTxOperations.add(operation);
152                 facadeOnEntry = null;
153             }  else {
154                 facadeOnEntry = transactionFacade;
155             }
156         }
157
158         if (facadeOnEntry != null) {
159             operation.accept(facadeOnEntry);
160         }
161     }
162
163     private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
164         while (true) {
165             // Access to queuedTxOperations and transactionFacade must be protected and atomic
166             // (ie synchronized) with respect to #processTransactionOperation to handle timing
167             // issues and ensure no ProxyTransactionFacade is missed and that they are processed
168             // in the order they occurred.
169
170             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
171             // in case a transaction operation results in another transaction operation being
172             // queued (eg a put operation from a client read Future callback that is notified
173             // synchronously).
174             final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
175             synchronized (queuedTxOperations) {
176                 if (queuedTxOperations.isEmpty()) {
177                     // We're done invoking the transaction operations so we can now publish the
178                     // ProxyTransactionFacade.
179                     transactionFacade = newTransactionFacade;
180                     break;
181                 }
182
183                 operationsBatch = new ArrayList<>(queuedTxOperations);
184                 queuedTxOperations.clear();
185             }
186
187             // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
188             for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
189                 oper.accept(newTransactionFacade);
190             }
191         }
192     }
193
194     private void checkOpen() {
195         Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);
196     }
197 }