Do not leak DataTree from backend actor
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import com.google.common.base.Verify;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.LinkedHashMap;
19 import java.util.Map;
20 import java.util.Optional;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
29 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
30 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
31 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
34 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
35 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
36 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
37 import org.opendaylight.controller.cluster.access.concepts.Request;
38 import org.opendaylight.controller.cluster.access.concepts.RequestException;
39 import org.opendaylight.controller.cluster.access.concepts.Response;
40 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
41 import org.opendaylight.yangtools.concepts.Identifiable;
42 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Per-connection representation of a local history. This class handles state replication across a single connection.
49  *
50  * @author Robert Varga
51  */
52 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
53     private abstract static class AbstractLocal extends ProxyHistory {
54         private final ReadOnlyDataTree dataTree;
55
56         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
57             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
58             super(parent, connection, identifier);
59             this.dataTree = requireNonNull(dataTree);
60         }
61
62         final DataTreeSnapshot takeSnapshot() {
63             return dataTree.takeSnapshot();
64         }
65     }
66
67     private abstract static class AbstractRemote extends ProxyHistory {
68         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
69             final LocalHistoryIdentifier identifier) {
70             super(parent, connection, identifier);
71         }
72     }
73
74     private static final class Local extends AbstractLocal {
75         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
76                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
77
78         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
79         // the open one and attempts to create a new transaction again.
80         private LocalReadWriteProxyTransaction lastOpen;
81
82         private volatile LocalReadWriteProxyTransaction lastSealed;
83
84         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
85             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
86             super(parent, connection, identifier, dataTree);
87         }
88
89         @Override
90         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
91                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
92             checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
93
94             if (isDone) {
95                 // Done transactions do not register on our radar on should not have any state associated.
96                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
97                         : new LocalReadWriteProxyTransaction(this, txId);
98             }
99
100             // onTransactionCompleted() runs concurrently
101             final LocalReadWriteProxyTransaction localSealed = lastSealed;
102             final DataTreeSnapshot baseSnapshot;
103             if (localSealed != null) {
104                 baseSnapshot = localSealed.getSnapshot();
105             } else {
106                 baseSnapshot = takeSnapshot();
107             }
108
109             if (snapshotOnly) {
110                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
111             }
112
113             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
114             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
115             return lastOpen;
116         }
117
118         @Override
119         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
120             return createClient(parent(), connection, getIdentifier());
121         }
122
123         @Override
124         void onTransactionAborted(final AbstractProxyTransaction tx) {
125             if (tx.equals(lastOpen)) {
126                 lastOpen = null;
127             }
128         }
129
130         @Override
131         void onTransactionCompleted(final AbstractProxyTransaction tx) {
132             Verify.verify(tx instanceof LocalProxyTransaction);
133             if (tx instanceof LocalReadWriteProxyTransaction
134                     && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
135                 LOG.debug("Completed last sealed transaction {}", tx);
136             }
137         }
138
139         @Override
140         void onTransactionSealed(final AbstractProxyTransaction tx) {
141             checkState(tx.equals(lastOpen));
142             lastSealed = lastOpen;
143             lastOpen = null;
144         }
145     }
146
147     private static final class LocalSingle extends AbstractLocal {
148         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
149             final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
150             super(parent, connection, identifier, dataTree);
151         }
152
153         @Override
154         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
155                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
156             final DataTreeSnapshot snapshot = takeSnapshot();
157             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
158                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
159         }
160
161         @Override
162         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
163             return createSingle(parent(), connection, getIdentifier());
164         }
165     }
166
167     private static final class Remote extends AbstractRemote {
168         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
169             final LocalHistoryIdentifier identifier) {
170             super(parent, connection, identifier);
171         }
172
173         @Override
174         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
175                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
176             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
177         }
178
179         @Override
180         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
181             return createClient(parent(), connection, getIdentifier());
182         }
183     }
184
185     private static final class RemoteSingle extends AbstractRemote {
186         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
187             final LocalHistoryIdentifier identifier) {
188             super(parent, connection, identifier);
189         }
190
191         @Override
192         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
193                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
194             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
195         }
196
197         @Override
198         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
199             return createSingle(parent(), connection, getIdentifier());
200         }
201     }
202
203     private static final class RequestReplayException extends RequestException {
204         private static final long serialVersionUID = 1L;
205
206         RequestReplayException(final String format, final Object... args) {
207             super(String.format(format, args));
208         }
209
210         @Override
211         public boolean isRetriable() {
212             return false;
213         }
214     }
215
216     private final class ReconnectCohort extends ProxyReconnectCohort {
217         @Override
218         public LocalHistoryIdentifier getIdentifier() {
219             return identifier;
220         }
221
222         @Holding("lock")
223         @Override
224         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
225             // First look for our Create message
226             Iterator<ConnectionEntry> it = previousEntries.iterator();
227             while (it.hasNext()) {
228                 final ConnectionEntry e = it.next();
229                 final Request<?, ?> req = e.getRequest();
230                 if (identifier.equals(req.getTarget())) {
231                     Verify.verify(req instanceof LocalHistoryRequest);
232                     if (req instanceof CreateLocalHistoryRequest) {
233                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
234                         it.remove();
235                         break;
236                     }
237                 }
238             }
239
240             for (AbstractProxyTransaction t : proxies.values()) {
241                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
242                 t.replayMessages(successor, previousEntries);
243             }
244
245             // Now look for any finalizing messages
246             it = previousEntries.iterator();
247             while (it.hasNext()) {
248                 final ConnectionEntry e  = it.next();
249                 final Request<?, ?> req = e.getRequest();
250                 if (identifier.equals(req.getTarget())) {
251                     Verify.verify(req instanceof LocalHistoryRequest);
252                     if (req instanceof DestroyLocalHistoryRequest) {
253                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
254                         it.remove();
255                         break;
256                     }
257                 }
258             }
259         }
260
261         @GuardedBy("lock")
262         @Override
263         ProxyHistory finishReconnect() {
264             final ProxyHistory ret = Verify.verifyNotNull(successor);
265
266             for (AbstractProxyTransaction t : proxies.values()) {
267                 t.finishReconnect();
268             }
269
270             LOG.debug("Finished reconnecting proxy history {}", this);
271             lock.unlock();
272             return ret;
273         }
274
275         @Override
276         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
277                 throws RequestException {
278             final Request<?, ?> request = entry.getRequest();
279             if (request instanceof TransactionRequest) {
280                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
281                     entry.getEnqueuedTicks());
282             } else if (request instanceof LocalHistoryRequest) {
283                 replayTo.accept(entry);
284             } else {
285                 throw new IllegalArgumentException("Unhandled request " + request);
286             }
287         }
288
289         @Override
290         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
291                 throws RequestException {
292             final Request<?, ?> request = entry.getRequest();
293             if (request instanceof TransactionRequest) {
294                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
295             } else if (request instanceof LocalHistoryRequest) {
296                 forwardTo.accept(entry);
297             } else {
298                 throw new IllegalArgumentException("Unhandled request " + request);
299             }
300         }
301
302         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
303                 throws RequestReplayException {
304             final AbstractProxyTransaction proxy;
305             lock.lock();
306             try {
307                 proxy = proxies.get(request.getTarget());
308             } finally {
309                 lock.unlock();
310             }
311             if (proxy != null) {
312                 return proxy;
313             }
314
315             throw new RequestReplayException("Failed to find proxy for %s", request);
316         }
317     }
318
319     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
320
321     private final Lock lock = new ReentrantLock();
322     private final LocalHistoryIdentifier identifier;
323     private final AbstractClientConnection<ShardBackendInfo> connection;
324     private final AbstractClientHistory parent;
325
326     @GuardedBy("lock")
327     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
328     @GuardedBy("lock")
329     private ProxyHistory successor;
330
331     private ProxyHistory(final AbstractClientHistory parent,
332             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
333         this.parent = requireNonNull(parent);
334         this.connection = requireNonNull(connection);
335         this.identifier = requireNonNull(identifier);
336     }
337
338     static ProxyHistory createClient(final AbstractClientHistory parent,
339             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
340         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
341         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
342              : new Remote(parent, connection, identifier);
343     }
344
345     static ProxyHistory createSingle(final AbstractClientHistory parent,
346             final AbstractClientConnection<ShardBackendInfo> connection,
347             final LocalHistoryIdentifier identifier) {
348         final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
349         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
350              : new RemoteSingle(parent, connection, identifier);
351     }
352
353     @Override
354     public LocalHistoryIdentifier getIdentifier() {
355         return identifier;
356     }
357
358     final ClientActorContext context() {
359         return connection.context();
360     }
361
362     final long currentTime() {
363         return connection.currentTime();
364     }
365
366     final ActorRef localActor() {
367         return connection.localActor();
368     }
369
370     final AbstractClientHistory parent() {
371         return parent;
372     }
373
374     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
375             final boolean snapshotOnly) {
376         return createTransactionProxy(txId, snapshotOnly, false);
377     }
378
379     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
380             final boolean isDone) {
381         lock.lock();
382         try {
383             if (successor != null) {
384                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
385             }
386
387             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
388             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
389             proxies.put(proxyId, ret);
390             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
391             return ret;
392         } finally {
393             lock.unlock();
394         }
395     }
396
397     final void abortTransaction(final AbstractProxyTransaction tx) {
398         lock.lock();
399         try {
400             // Removal will be completed once purge completes
401             LOG.debug("Proxy {} aborted transaction {}", this, tx);
402             onTransactionAborted(tx);
403         } finally {
404             lock.unlock();
405         }
406     }
407
408     final void completeTransaction(final AbstractProxyTransaction tx) {
409         lock.lock();
410         try {
411             // Removal will be completed once purge completes
412             LOG.debug("Proxy {} completing transaction {}", this, tx);
413             onTransactionCompleted(tx);
414         } finally {
415             lock.unlock();
416         }
417     }
418
419     void purgeTransaction(final AbstractProxyTransaction tx) {
420         lock.lock();
421         try {
422             proxies.remove(tx.getIdentifier());
423             LOG.debug("Proxy {} purged transaction {}", this, tx);
424         } finally {
425             lock.unlock();
426         }
427     }
428
429     final void close() {
430         lock.lock();
431         try {
432             if (successor != null) {
433                 successor.close();
434                 return;
435             }
436
437             LOG.debug("Proxy {} invoking destroy", this);
438             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
439                 this::onDestroyComplete);
440         } finally {
441             lock.unlock();
442         }
443     }
444
445     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
446             final long enqueuedTicks) {
447         connection.enqueueRequest(request, callback, enqueuedTicks);
448     }
449
450     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
451         connection.sendRequest(request, callback);
452     }
453
454     @GuardedBy("lock")
455     @SuppressWarnings("checkstyle:hiddenField")
456     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
457             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
458
459     @SuppressWarnings("checkstyle:hiddenField")
460     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
461
462     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
463     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
464         lock.lock();
465         if (successor != null) {
466             lock.unlock();
467             throw new IllegalStateException("Proxy history " + this + " already has a successor");
468         }
469
470         successor = createSuccessor(newConnection);
471         LOG.debug("History {} instantiated successor {}", this, successor);
472
473         for (AbstractProxyTransaction t : proxies.values()) {
474             t.startReconnect();
475         }
476
477         return new ReconnectCohort();
478     }
479
480     private void onDestroyComplete(final Response<?, ?> response) {
481         LOG.debug("Proxy {} destroy completed with {}", this, response);
482
483         lock.lock();
484         try {
485             parent.onProxyDestroyed(this);
486             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
487                 this::onPurgeComplete);
488         } finally {
489             lock.unlock();
490         }
491     }
492
493     private void onPurgeComplete(final Response<?, ?> response) {
494         LOG.debug("Proxy {} purge completed with {}", this, response);
495     }
496
497     @Holding("lock")
498     void onTransactionAborted(final AbstractProxyTransaction tx) {
499         // No-op for most implementations
500     }
501
502     @Holding("lock")
503     void onTransactionCompleted(final AbstractProxyTransaction tx) {
504         // No-op for most implementations
505     }
506
507     void onTransactionSealed(final AbstractProxyTransaction tx) {
508         // No-op on most implementations
509     }
510 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.