BUG-5280: fix problems identified by integration tests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestException;
30 import org.opendaylight.controller.cluster.access.concepts.Response;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Per-connection representation of a local history. This class handles state replication across a single connection.
41  *
42  * @author Robert Varga
43  */
44 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
45     private abstract static class AbstractLocal extends ProxyHistory {
46         private final DataTree dataTree;
47
48         AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
49             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
50             super(connection, identifier);
51             this.dataTree = Preconditions.checkNotNull(dataTree);
52         }
53
54         final DataTreeSnapshot takeSnapshot() {
55             return dataTree.takeSnapshot();
56         }
57     }
58
59     private abstract static class AbstractRemote extends ProxyHistory {
60         AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
61             final LocalHistoryIdentifier identifier) {
62             super(connection, identifier);
63         }
64
65         @Override
66         final AbstractProxyTransaction doCreateTransactionProxy(
67                 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
68             return new RemoteProxyTransaction(this, txId);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalProxyTransaction lastOpen;
79
80         private volatile LocalProxyTransaction lastSealed;
81
82         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
83             final DataTree dataTree) {
84             super(connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId) {
90             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91
92             // onTransactionCompleted() runs concurrently
93             final LocalProxyTransaction localSealed = lastSealed;
94             final DataTreeSnapshot baseSnapshot;
95             if (localSealed != null) {
96                 baseSnapshot = localSealed.getSnapshot();
97             } else {
98                 baseSnapshot = takeSnapshot();
99             }
100
101             lastOpen = new LocalProxyTransaction(this, txId,
102                 (CursorAwareDataTreeModification) baseSnapshot.newModification());
103             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
104             return lastOpen;
105         }
106
107         @Override
108         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109             return createClient(connection, getIdentifier());
110         }
111
112         @Override
113         void onTransactionAborted(final AbstractProxyTransaction tx) {
114             Preconditions.checkState(tx.equals(lastOpen));
115             lastOpen = null;
116         }
117
118         @Override
119         void onTransactionCompleted(final AbstractProxyTransaction tx) {
120             Verify.verify(tx instanceof LocalProxyTransaction);
121
122             if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
123                 LOG.debug("Completed last sealed transaction {}", tx);
124             }
125         }
126
127         @Override
128         void onTransactionSealed(final AbstractProxyTransaction tx) {
129             Preconditions.checkState(tx.equals(lastOpen));
130             lastSealed = lastOpen;
131             lastOpen = null;
132         }
133     }
134
135     private static final class LocalSingle extends AbstractLocal {
136         LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
137             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
138             super(connection, identifier, dataTree);
139         }
140
141         @Override
142         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
143                 final TransactionIdentifier txId) {
144             return new LocalProxyTransaction(this, txId,
145                 (CursorAwareDataTreeModification) takeSnapshot().newModification());
146         }
147
148         @Override
149         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
150             return createSingle(connection, getIdentifier());
151         }
152     }
153
154     private static final class Remote extends AbstractRemote {
155         Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
156             super(connection, identifier);
157         }
158
159         @Override
160         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
161             return createClient(connection, getIdentifier());
162         }
163     }
164
165     private static final class RemoteSingle extends AbstractRemote {
166         RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
167             final LocalHistoryIdentifier identifier) {
168             super(connection, identifier);
169         }
170
171         @Override
172         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
173             return createSingle(connection, getIdentifier());
174         }
175     }
176
177     private static final class RequestReplayException extends RequestException {
178         private static final long serialVersionUID = 1L;
179
180         RequestReplayException(final String format, final Object... args) {
181             super(String.format(format, args));
182         }
183
184         @Override
185         public boolean isRetriable() {
186             return false;
187         }
188     }
189
190     private final class ReconnectCohort extends ProxyReconnectCohort {
191         @Override
192         public LocalHistoryIdentifier getIdentifier() {
193             return identifier;
194         }
195
196         @GuardedBy("lock")
197         @Override
198         void replaySuccessfulRequests() {
199             for (AbstractProxyTransaction t : proxies.values()) {
200                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
201                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
202                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
203                 t.replaySuccessfulRequests(newProxy);
204             }
205         }
206
207         @GuardedBy("lock")
208         @Override
209         ProxyHistory finishReconnect() {
210             final ProxyHistory ret = Verify.verifyNotNull(successor);
211             LOG.debug("Finished reconnecting proxy history {}", this);
212             lock.unlock();
213             return ret;
214         }
215
216         @Override
217         void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
218                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
219             if (request instanceof TransactionRequest) {
220                 replayTransactionRequest((TransactionRequest<?>) request, callback);
221             } else if (request instanceof LocalHistoryRequest) {
222                 replayTo.accept(request, callback);
223             } else {
224                 throw new IllegalArgumentException("Unhandled request " + request);
225             }
226         }
227
228         private void replayTransactionRequest(final TransactionRequest<?> request,
229                 final Consumer<Response<?, ?>> callback) throws RequestException {
230
231             final AbstractProxyTransaction proxy;
232             lock.lock();
233             try {
234                 proxy = proxies.get(request.getTarget());
235             } finally {
236                 lock.unlock();
237             }
238             if (proxy == null) {
239                 throw new RequestReplayException("Failed to find proxy for %s", request);
240             }
241
242             proxy.replayRequest(request, callback);
243         }
244     }
245
246     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
247
248     private final Lock lock = new ReentrantLock();
249     private final LocalHistoryIdentifier identifier;
250     private final AbstractClientConnection<ShardBackendInfo> connection;
251
252     @GuardedBy("lock")
253     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
254     @GuardedBy("lock")
255     private ProxyHistory successor;
256
257     private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
258             final LocalHistoryIdentifier identifier) {
259         this.connection = Preconditions.checkNotNull(connection);
260         this.identifier = Preconditions.checkNotNull(identifier);
261     }
262
263     static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
264             final LocalHistoryIdentifier identifier) {
265         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
266         return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
267              : new Remote(connection, identifier);
268     }
269
270     static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
271             final LocalHistoryIdentifier identifier) {
272         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
273         return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
274              : new RemoteSingle(connection, identifier);
275     }
276
277     @Override
278     public LocalHistoryIdentifier getIdentifier() {
279         return identifier;
280     }
281
282     final ActorRef localActor() {
283         return connection.localActor();
284     }
285
286     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
287         lock.lock();
288         try {
289             if (successor != null) {
290                 return successor.createTransactionProxy(txId);
291             }
292
293             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
294             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
295             proxies.put(proxyId, ret);
296             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
297             return ret;
298         } finally {
299             lock.unlock();
300         }
301     }
302
303     final void abortTransaction(final AbstractProxyTransaction tx) {
304         lock.lock();
305         try {
306             proxies.remove(tx.getIdentifier());
307             LOG.debug("Proxy {} aborting transaction {}", this, tx);
308             onTransactionAborted(tx);
309         } finally {
310             lock.unlock();
311         }
312     }
313
314     final void completeTransaction(final AbstractProxyTransaction tx) {
315         lock.lock();
316         try {
317             proxies.remove(tx.getIdentifier());
318             LOG.debug("Proxy {} completing transaction {}", this, tx);
319             onTransactionCompleted(tx);
320         } finally {
321             lock.unlock();
322         }
323     }
324
325     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
326         connection.sendRequest(request, callback);
327     }
328
329     @GuardedBy("lock")
330     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
331             TransactionIdentifier txId);
332
333     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
334
335     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
336     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
337         lock.lock();
338         if (successor != null) {
339             lock.unlock();
340             throw new IllegalStateException("Proxy history " + this + " already has a successor");
341         }
342
343         successor = createSuccessor(newConnection);
344         LOG.debug("History {} instantiated successor {}", this, successor);
345         return new ReconnectCohort();
346     }
347
348     @GuardedBy("lock")
349     void onTransactionAborted(final AbstractProxyTransaction tx) {
350         // No-op for most implementations
351     }
352
353     @GuardedBy("lock")
354     void onTransactionCompleted(final AbstractProxyTransaction tx) {
355         // No-op for most implementations
356     }
357
358     void onTransactionSealed(final AbstractProxyTransaction tx) {
359         // No-op on most implementations
360     }
361 }