Merge "NECONF-524 : Setting the netconf keepalive logic to be more proactive."
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ProxyReadWriteTransaction.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.util.Timeout;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FluentFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.function.Consumer;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.controller.md.sal.common.api.MappingCheckedFuture;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.ExecutionContext;
37 import scala.concurrent.Future;
38
39 /**
40  * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
41  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
42  */
43 public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
44     private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
45
46     private final RemoteDeviceId id;
47     private final AtomicBoolean opened = new AtomicBoolean(true);
48
49     @GuardedBy("queuedTxOperations")
50     private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
51
52     private volatile ProxyTransactionFacade transactionFacade;
53
54     public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
55             final ExecutionContext executionContext, final Timeout askTimeout) {
56         this.id = id;
57
58         masterTxActorFuture.onComplete(new OnComplete<Object>() {
59             @Override
60             public void onComplete(final Throwable failure, final Object masterTxActor) {
61                 final ProxyTransactionFacade newTransactionFacade;
62                 if (failure != null) {
63                     LOG.debug("{}: Failed to obtain master actor", id, failure);
64                     newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
65                 } else {
66                     LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
67                     newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
68                             executionContext, askTimeout);
69                 }
70
71                 executePriorTransactionOperations(newTransactionFacade);
72             }
73         }, executionContext);
74     }
75
76     @Override
77     public boolean cancel() {
78         if (!opened.compareAndSet(true, false)) {
79             return false;
80         }
81
82         processTransactionOperation(AsyncWriteTransaction::cancel);
83         return true;
84     }
85
86     @Override
87     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
88             final YangInstanceIdentifier path) {
89         LOG.debug("{}: Read {} {}", id, store, path);
90
91         final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
92         processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
93         return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
94     }
95
96     @Override
97     public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
98             final YangInstanceIdentifier path) {
99         LOG.debug("{}: Exists {} {}", id, store, path);
100
101         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
102         processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
103         return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
104     }
105
106     @Override
107     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
108         checkOpen();
109         LOG.debug("{}: Delete {} {}", id, store, path);
110         processTransactionOperation(facade -> facade.delete(store, path));
111     }
112
113     @Override
114     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
115                     final NormalizedNode<?, ?> data) {
116         checkOpen();
117         LOG.debug("{}: Put {} {}", id, store, path);
118         processTransactionOperation(facade -> facade.put(store, path, data));
119     }
120
121     @Override
122     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
123                       final NormalizedNode<?, ?> data) {
124         checkOpen();
125         LOG.debug("{}: Merge {} {}", id, store, path);
126         processTransactionOperation(facade -> facade.merge(store, path, data));
127     }
128
129     @Override
130     public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
131         Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
132         LOG.debug("{}: Commit", id);
133
134         final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
135         processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
136         return returnFuture;
137     }
138
139     @Override
140     public Object getIdentifier() {
141         return id;
142     }
143
144     private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
145         final ProxyTransactionFacade facadeOnEntry;
146         synchronized (queuedTxOperations) {
147             if (transactionFacade == null) {
148                 LOG.debug("{}: Queuing transaction operation", id);
149
150                 queuedTxOperations.add(operation);
151                 facadeOnEntry = null;
152             }  else {
153                 facadeOnEntry = transactionFacade;
154             }
155         }
156
157         if (facadeOnEntry != null) {
158             operation.accept(facadeOnEntry);
159         }
160     }
161
162     private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
163         while (true) {
164             // Access to queuedTxOperations and transactionFacade must be protected and atomic
165             // (ie synchronized) with respect to #processTransactionOperation to handle timing
166             // issues and ensure no ProxyTransactionFacade is missed and that they are processed
167             // in the order they occurred.
168
169             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
170             // in case a transaction operation results in another transaction operation being
171             // queued (eg a put operation from a client read Future callback that is notified
172             // synchronously).
173             final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
174             synchronized (queuedTxOperations) {
175                 if (queuedTxOperations.isEmpty()) {
176                     // We're done invoking the transaction operations so we can now publish the
177                     // ProxyTransactionFacade.
178                     transactionFacade = newTransactionFacade;
179                     break;
180                 }
181
182                 operationsBatch = new ArrayList<>(queuedTxOperations);
183                 queuedTxOperations.clear();
184             }
185
186             // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
187             for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
188                 oper.accept(newTransactionFacade);
189             }
190         }
191     }
192
193     private void checkOpen() {
194         Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);
195     }
196 }