BUG-8494: fix throttling during reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.Consumer;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestException;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.yangtools.concepts.Identifiable;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Per-connection representation of a local history. This class handles state replication across a single connection.
46  *
47  * @author Robert Varga
48  */
49 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
50     private abstract static class AbstractLocal extends ProxyHistory {
51         private final DataTree dataTree;
52
53         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
54             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
55             super(parent, connection, identifier);
56             this.dataTree = Preconditions.checkNotNull(dataTree);
57         }
58
59         final DataTreeSnapshot takeSnapshot() {
60             return dataTree.takeSnapshot();
61         }
62     }
63
64     private abstract static class AbstractRemote extends ProxyHistory {
65         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
66             final LocalHistoryIdentifier identifier) {
67             super(parent, connection, identifier);
68         }
69     }
70
71     private static final class Local extends AbstractLocal {
72         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
73                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
74
75         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
76         // the open one and attempts to create a new transaction again.
77         private LocalReadWriteProxyTransaction lastOpen;
78
79         private volatile LocalReadWriteProxyTransaction lastSealed;
80
81         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
82             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
83             super(parent, connection, identifier, dataTree);
84         }
85
86         @Override
87         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
88                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
89             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
90
91             if (isDone) {
92                 // Done transactions do not register on our radar on should not have any state associated.
93                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
94                         : new LocalReadWriteProxyTransaction(this, txId);
95             }
96
97             // onTransactionCompleted() runs concurrently
98             final LocalReadWriteProxyTransaction localSealed = lastSealed;
99             final DataTreeSnapshot baseSnapshot;
100             if (localSealed != null) {
101                 baseSnapshot = localSealed.getSnapshot();
102             } else {
103                 baseSnapshot = takeSnapshot();
104             }
105
106             if (snapshotOnly) {
107                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
108             }
109
110             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
111             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
112             return lastOpen;
113         }
114
115         @Override
116         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
117             return createClient(parent(), connection, getIdentifier());
118         }
119
120         @Override
121         void onTransactionAborted(final AbstractProxyTransaction tx) {
122             if (tx.equals(lastOpen)) {
123                 lastOpen = null;
124             }
125         }
126
127         @Override
128         void onTransactionCompleted(final AbstractProxyTransaction tx) {
129             Verify.verify(tx instanceof LocalProxyTransaction);
130             if (tx instanceof LocalReadWriteProxyTransaction) {
131                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
132                     LOG.debug("Completed last sealed transaction {}", tx);
133                 }
134             }
135         }
136
137         @Override
138         void onTransactionSealed(final AbstractProxyTransaction tx) {
139             Preconditions.checkState(tx.equals(lastOpen));
140             lastSealed = lastOpen;
141             lastOpen = null;
142         }
143     }
144
145     private static final class LocalSingle extends AbstractLocal {
146         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
147             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
148             super(parent, connection, identifier, dataTree);
149         }
150
151         @Override
152         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
153                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
154             final DataTreeSnapshot snapshot = takeSnapshot();
155             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
156                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
157         }
158
159         @Override
160         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
161             return createSingle(parent(), connection, getIdentifier());
162         }
163     }
164
165     private static final class Remote extends AbstractRemote {
166         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
167             final LocalHistoryIdentifier identifier) {
168             super(parent, connection, identifier);
169         }
170
171         @Override
172         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
173                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
174             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
175         }
176
177         @Override
178         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
179             return createClient(parent(), connection, getIdentifier());
180         }
181     }
182
183     private static final class RemoteSingle extends AbstractRemote {
184         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
185             final LocalHistoryIdentifier identifier) {
186             super(parent, connection, identifier);
187         }
188
189         @Override
190         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
191                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
192             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
193         }
194
195         @Override
196         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
197             return createSingle(parent(), connection, getIdentifier());
198         }
199     }
200
201     private static final class RequestReplayException extends RequestException {
202         private static final long serialVersionUID = 1L;
203
204         RequestReplayException(final String format, final Object... args) {
205             super(String.format(format, args));
206         }
207
208         @Override
209         public boolean isRetriable() {
210             return false;
211         }
212     }
213
214     private final class ReconnectCohort extends ProxyReconnectCohort {
215         @Override
216         public LocalHistoryIdentifier getIdentifier() {
217             return identifier;
218         }
219
220         @GuardedBy("lock")
221         @Override
222         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
223             // First look for our Create message
224             Iterator<ConnectionEntry> it = previousEntries.iterator();
225             while (it.hasNext()) {
226                 final ConnectionEntry e = it.next();
227                 final Request<?, ?> req = e.getRequest();
228                 if (identifier.equals(req.getTarget())) {
229                     Verify.verify(req instanceof LocalHistoryRequest);
230                     if (req instanceof CreateLocalHistoryRequest) {
231                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
232                         it.remove();
233                         break;
234                     }
235                 }
236             }
237
238             for (AbstractProxyTransaction t : proxies.values()) {
239                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
240                 t.replayMessages(successor, previousEntries);
241             }
242
243             // Now look for any finalizing messages
244             it = previousEntries.iterator();
245             while (it.hasNext()) {
246                 final ConnectionEntry e  = it.next();
247                 final Request<?, ?> req = e.getRequest();
248                 if (identifier.equals(req.getTarget())) {
249                     Verify.verify(req instanceof LocalHistoryRequest);
250                     if (req instanceof DestroyLocalHistoryRequest) {
251                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
252                         it.remove();
253                         break;
254                     }
255                 }
256             }
257         }
258
259         @GuardedBy("lock")
260         @Override
261         ProxyHistory finishReconnect() {
262             final ProxyHistory ret = Verify.verifyNotNull(successor);
263
264             for (AbstractProxyTransaction t : proxies.values()) {
265                 t.finishReconnect();
266             }
267
268             LOG.debug("Finished reconnecting proxy history {}", this);
269             lock.unlock();
270             return ret;
271         }
272
273         @Override
274         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
275                 throws RequestException {
276             final Request<?, ?> request = entry.getRequest();
277             if (request instanceof TransactionRequest) {
278                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
279                     entry.getEnqueuedTicks());
280             } else if (request instanceof LocalHistoryRequest) {
281                 replayTo.accept(entry);
282             } else {
283                 throw new IllegalArgumentException("Unhandled request " + request);
284             }
285         }
286
287         @Override
288         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
289                 throws RequestException {
290             final Request<?, ?> request = entry.getRequest();
291             if (request instanceof TransactionRequest) {
292                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
293             } else if (request instanceof LocalHistoryRequest) {
294                 forwardTo.accept(entry);
295             } else {
296                 throw new IllegalArgumentException("Unhandled request " + request);
297             }
298         }
299
300         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
301                 throws RequestReplayException {
302             final AbstractProxyTransaction proxy;
303             lock.lock();
304             try {
305                 proxy = proxies.get(request.getTarget());
306             } finally {
307                 lock.unlock();
308             }
309             if (proxy != null) {
310                 return proxy;
311             }
312
313             throw new RequestReplayException("Failed to find proxy for %s", request);
314         }
315     }
316
317     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
318
319     private final Lock lock = new ReentrantLock();
320     private final LocalHistoryIdentifier identifier;
321     private final AbstractClientConnection<ShardBackendInfo> connection;
322     private final AbstractClientHistory parent;
323
324     @GuardedBy("lock")
325     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
326     @GuardedBy("lock")
327     private ProxyHistory successor;
328
329     private ProxyHistory(final AbstractClientHistory parent,
330             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
331         this.parent = Preconditions.checkNotNull(parent);
332         this.connection = Preconditions.checkNotNull(connection);
333         this.identifier = Preconditions.checkNotNull(identifier);
334     }
335
336     static ProxyHistory createClient(final AbstractClientHistory parent,
337             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
338         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
339         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
340              : new Remote(parent, connection, identifier);
341     }
342
343     static ProxyHistory createSingle(final AbstractClientHistory parent,
344             final AbstractClientConnection<ShardBackendInfo> connection,
345             final LocalHistoryIdentifier identifier) {
346         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
347         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
348              : new RemoteSingle(parent, connection, identifier);
349     }
350
351     @Override
352     public LocalHistoryIdentifier getIdentifier() {
353         return identifier;
354     }
355
356     final ClientActorContext context() {
357         return connection.context();
358     }
359
360     final long currentTime() {
361         return connection.currentTime();
362     }
363
364     final ActorRef localActor() {
365         return connection.localActor();
366     }
367
368     final AbstractClientHistory parent() {
369         return parent;
370     }
371
372     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
373             final boolean snapshotOnly) {
374         return createTransactionProxy(txId, snapshotOnly, false);
375     }
376
377     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
378             final boolean isDone) {
379         lock.lock();
380         try {
381             if (successor != null) {
382                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
383             }
384
385             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
386             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
387             proxies.put(proxyId, ret);
388             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
389             return ret;
390         } finally {
391             lock.unlock();
392         }
393     }
394
395     final void abortTransaction(final AbstractProxyTransaction tx) {
396         lock.lock();
397         try {
398             // Removal will be completed once purge completes
399             LOG.debug("Proxy {} aborted transaction {}", this, tx);
400             onTransactionAborted(tx);
401         } finally {
402             lock.unlock();
403         }
404     }
405
406     final void completeTransaction(final AbstractProxyTransaction tx) {
407         lock.lock();
408         try {
409             // Removal will be completed once purge completes
410             LOG.debug("Proxy {} completing transaction {}", this, tx);
411             onTransactionCompleted(tx);
412         } finally {
413             lock.unlock();
414         }
415     }
416
417     void purgeTransaction(final AbstractProxyTransaction tx) {
418         lock.lock();
419         try {
420             proxies.remove(tx.getIdentifier());
421             LOG.debug("Proxy {} purged transaction {}", this, tx);
422         } finally {
423             lock.unlock();
424         }
425     }
426
427     final void close() {
428         lock.lock();
429         try {
430             if (successor != null) {
431                 successor.close();
432                 return;
433             }
434
435             LOG.debug("Proxy {} invoking destroy", this);
436             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
437                 this::onDestroyComplete);
438         } finally {
439             lock.unlock();
440         }
441     }
442
443     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
444             final long enqueuedTicks) {
445         connection.enqueueRequest(request, callback, enqueuedTicks);
446     }
447
448     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
449         connection.sendRequest(request, callback);
450     }
451
452     @GuardedBy("lock")
453     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
454             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
455
456     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
457
458     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
459     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
460         lock.lock();
461         if (successor != null) {
462             lock.unlock();
463             throw new IllegalStateException("Proxy history " + this + " already has a successor");
464         }
465
466         successor = createSuccessor(newConnection);
467         LOG.debug("History {} instantiated successor {}", this, successor);
468
469         for (AbstractProxyTransaction t : proxies.values()) {
470             t.startReconnect();
471         }
472
473         return new ReconnectCohort();
474     }
475
476     private void onDestroyComplete(final Response<?, ?> response) {
477         LOG.debug("Proxy {} destroy completed with {}", this, response);
478
479         lock.lock();
480         try {
481             parent.onProxyDestroyed(this);
482             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
483                 this::onPurgeComplete);
484         } finally {
485             lock.unlock();
486         }
487     }
488
489     private void onPurgeComplete(final Response<?, ?> response) {
490         LOG.debug("Proxy {} purge completed with {}", this, response);
491     }
492
493     @GuardedBy("lock")
494     void onTransactionAborted(final AbstractProxyTransaction tx) {
495         // No-op for most implementations
496     }
497
498     @GuardedBy("lock")
499     void onTransactionCompleted(final AbstractProxyTransaction tx) {
500         // No-op for most implementations
501     }
502
503     void onTransactionSealed(final AbstractProxyTransaction tx) {
504         // No-op on most implementations
505     }
506 }