e75a2df4c0d6084ba6629ff2926357a7b6a8a0f1
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Per-connection representation of a local history. This class handles state replication across a single connection.
47  *
48  * @author Robert Varga
49  */
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51     private abstract static class AbstractLocal extends ProxyHistory {
52         private final DataTree dataTree;
53
54         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56             super(parent, connection, identifier);
57             this.dataTree = Preconditions.checkNotNull(dataTree);
58         }
59
60         final DataTreeSnapshot takeSnapshot() {
61             return dataTree.takeSnapshot();
62         }
63     }
64
65     private abstract static class AbstractRemote extends ProxyHistory {
66         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67             final LocalHistoryIdentifier identifier) {
68             super(parent, connection, identifier);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalReadWriteProxyTransaction lastOpen;
79
80         private volatile LocalReadWriteProxyTransaction lastSealed;
81
82         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84             super(parent, connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
90             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91
92             if (isDone) {
93                 // Done transactions do not register on our radar on should not have any state associated.
94                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
95                         : new LocalReadWriteProxyTransaction(this, txId);
96             }
97
98             // onTransactionCompleted() runs concurrently
99             final LocalReadWriteProxyTransaction localSealed = lastSealed;
100             final DataTreeSnapshot baseSnapshot;
101             if (localSealed != null) {
102                 baseSnapshot = localSealed.getSnapshot();
103             } else {
104                 baseSnapshot = takeSnapshot();
105             }
106
107             if (snapshotOnly) {
108                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
109             }
110
111             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
112             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
113             return lastOpen;
114         }
115
116         @Override
117         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
118             return createClient(parent(), connection, getIdentifier());
119         }
120
121         @Override
122         void onTransactionAborted(final AbstractProxyTransaction tx) {
123             if (tx.equals(lastOpen)) {
124                 lastOpen = null;
125             }
126         }
127
128         @Override
129         void onTransactionCompleted(final AbstractProxyTransaction tx) {
130             Verify.verify(tx instanceof LocalProxyTransaction);
131             if (tx instanceof LocalReadWriteProxyTransaction) {
132                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
133                     LOG.debug("Completed last sealed transaction {}", tx);
134                 }
135             }
136         }
137
138         @Override
139         void onTransactionSealed(final AbstractProxyTransaction tx) {
140             Preconditions.checkState(tx.equals(lastOpen));
141             lastSealed = lastOpen;
142             lastOpen = null;
143         }
144     }
145
146     private static final class LocalSingle extends AbstractLocal {
147         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
148             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
149             super(parent, connection, identifier, dataTree);
150         }
151
152         @Override
153         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
154                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
155             final DataTreeSnapshot snapshot = takeSnapshot();
156             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
157                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
158         }
159
160         @Override
161         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
162             return createSingle(parent(), connection, getIdentifier());
163         }
164     }
165
166     private static final class Remote extends AbstractRemote {
167         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
168             final LocalHistoryIdentifier identifier) {
169             super(parent, connection, identifier);
170         }
171
172         @Override
173         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
174                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
175             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
176         }
177
178         @Override
179         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
180             return createClient(parent(), connection, getIdentifier());
181         }
182     }
183
184     private static final class RemoteSingle extends AbstractRemote {
185         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
186             final LocalHistoryIdentifier identifier) {
187             super(parent, connection, identifier);
188         }
189
190         @Override
191         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
192                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
193             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
194         }
195
196         @Override
197         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
198             return createSingle(parent(), connection, getIdentifier());
199         }
200     }
201
202     private static final class RequestReplayException extends RequestException {
203         private static final long serialVersionUID = 1L;
204
205         RequestReplayException(final String format, final Object... args) {
206             super(String.format(format, args));
207         }
208
209         @Override
210         public boolean isRetriable() {
211             return false;
212         }
213     }
214
215     private final class ReconnectCohort extends ProxyReconnectCohort {
216         @Override
217         public LocalHistoryIdentifier getIdentifier() {
218             return identifier;
219         }
220
221         @GuardedBy("lock")
222         @Override
223         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
224             // First look for our Create message
225             Iterator<ConnectionEntry> it = previousEntries.iterator();
226             while (it.hasNext()) {
227                 final ConnectionEntry e = it.next();
228                 final Request<?, ?> req = e.getRequest();
229                 if (identifier.equals(req.getTarget())) {
230                     Verify.verify(req instanceof LocalHistoryRequest);
231                     if (req instanceof CreateLocalHistoryRequest) {
232                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
233                         it.remove();
234                         break;
235                     }
236                 }
237             }
238
239             for (AbstractProxyTransaction t : proxies.values()) {
240                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
241                 t.replayMessages(successor, previousEntries);
242             }
243
244             // Now look for any finalizing messages
245             it = previousEntries.iterator();
246             while (it.hasNext()) {
247                 final ConnectionEntry e  = it.next();
248                 final Request<?, ?> req = e.getRequest();
249                 if (identifier.equals(req.getTarget())) {
250                     Verify.verify(req instanceof LocalHistoryRequest);
251                     if (req instanceof DestroyLocalHistoryRequest) {
252                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
253                         it.remove();
254                         break;
255                     }
256                 }
257             }
258         }
259
260         @GuardedBy("lock")
261         @Override
262         ProxyHistory finishReconnect() {
263             final ProxyHistory ret = Verify.verifyNotNull(successor);
264
265             for (AbstractProxyTransaction t : proxies.values()) {
266                 t.finishReconnect();
267             }
268
269             LOG.debug("Finished reconnecting proxy history {}", this);
270             lock.unlock();
271             return ret;
272         }
273
274         @Override
275         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
276                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
277             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
278             //        period required to get into the queue.
279             if (request instanceof TransactionRequest) {
280                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
281             } else if (request instanceof LocalHistoryRequest) {
282                 forwardTo.accept(request, callback);
283             } else {
284                 throw new IllegalArgumentException("Unhandled request " + request);
285             }
286         }
287
288         private void forwardTransactionRequest(final TransactionRequest<?> request,
289                 final Consumer<Response<?, ?>> callback) throws RequestException {
290
291             final AbstractProxyTransaction proxy;
292             lock.lock();
293             try {
294                 proxy = proxies.get(request.getTarget());
295             } finally {
296                 lock.unlock();
297             }
298             if (proxy == null) {
299                 throw new RequestReplayException("Failed to find proxy for %s", request);
300             }
301
302             proxy.forwardRequest(request, callback);
303         }
304     }
305
306     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
307
308     private final Lock lock = new ReentrantLock();
309     private final LocalHistoryIdentifier identifier;
310     private final AbstractClientConnection<ShardBackendInfo> connection;
311     private final AbstractClientHistory parent;
312
313     @GuardedBy("lock")
314     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
315     @GuardedBy("lock")
316     private ProxyHistory successor;
317
318     private ProxyHistory(final AbstractClientHistory parent,
319             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
320         this.parent = Preconditions.checkNotNull(parent);
321         this.connection = Preconditions.checkNotNull(connection);
322         this.identifier = Preconditions.checkNotNull(identifier);
323     }
324
325     static ProxyHistory createClient(final AbstractClientHistory parent,
326             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
327         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
328         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
329              : new Remote(parent, connection, identifier);
330     }
331
332     static ProxyHistory createSingle(final AbstractClientHistory parent,
333             final AbstractClientConnection<ShardBackendInfo> connection,
334             final LocalHistoryIdentifier identifier) {
335         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
336         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
337              : new RemoteSingle(parent, connection, identifier);
338     }
339
340     @Override
341     public LocalHistoryIdentifier getIdentifier() {
342         return identifier;
343     }
344
345     final ClientActorContext context() {
346         return connection.context();
347     }
348
349     final long currentTime() {
350         return connection.currentTime();
351     }
352
353     final ActorRef localActor() {
354         return connection.localActor();
355     }
356
357     final AbstractClientHistory parent() {
358         return parent;
359     }
360
361     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
362             final boolean snapshotOnly) {
363         return createTransactionProxy(txId, snapshotOnly, false);
364     }
365
366     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
367             final boolean isDone) {
368         lock.lock();
369         try {
370             if (successor != null) {
371                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
372             }
373
374             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
375             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
376             proxies.put(proxyId, ret);
377             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
378             return ret;
379         } finally {
380             lock.unlock();
381         }
382     }
383
384     final void abortTransaction(final AbstractProxyTransaction tx) {
385         lock.lock();
386         try {
387             // Removal will be completed once purge completes
388             LOG.debug("Proxy {} aborted transaction {}", this, tx);
389             onTransactionAborted(tx);
390         } finally {
391             lock.unlock();
392         }
393     }
394
395     final void completeTransaction(final AbstractProxyTransaction tx) {
396         lock.lock();
397         try {
398             // Removal will be completed once purge completes
399             LOG.debug("Proxy {} completing transaction {}", this, tx);
400             onTransactionCompleted(tx);
401         } finally {
402             lock.unlock();
403         }
404     }
405
406     void purgeTransaction(final AbstractProxyTransaction tx) {
407         lock.lock();
408         try {
409             proxies.remove(tx.getIdentifier());
410             LOG.debug("Proxy {} purged transaction {}", this, tx);
411         } finally {
412             lock.unlock();
413         }
414     }
415
416     final void close() {
417         lock.lock();
418         try {
419             if (successor != null) {
420                 successor.close();
421                 return;
422             }
423
424             LOG.debug("Proxy {} invoking destroy", this);
425             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
426                 this::onDestroyComplete);
427         } finally {
428             lock.unlock();
429         }
430     }
431
432     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
433             final long enqueuedTicks) {
434         connection.enqueueRequest(request, callback, enqueuedTicks);
435     }
436
437     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
438         connection.sendRequest(request, callback);
439     }
440
441     @GuardedBy("lock")
442     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
443             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
444
445     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
446
447     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
448     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
449         lock.lock();
450         if (successor != null) {
451             lock.unlock();
452             throw new IllegalStateException("Proxy history " + this + " already has a successor");
453         }
454
455         successor = createSuccessor(newConnection);
456         LOG.debug("History {} instantiated successor {}", this, successor);
457
458         for (AbstractProxyTransaction t : proxies.values()) {
459             t.startReconnect();
460         }
461
462         return new ReconnectCohort();
463     }
464
465     private void onDestroyComplete(final Response<?, ?> response) {
466         LOG.debug("Proxy {} destroy completed with {}", this, response);
467
468         lock.lock();
469         try {
470             parent.onProxyDestroyed(this);
471             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
472                 this::onPurgeComplete);
473         } finally {
474             lock.unlock();
475         }
476     }
477
478     private void onPurgeComplete(final Response<?, ?> response) {
479         LOG.debug("Proxy {} purge completed with {}", this, response);
480     }
481
482     @GuardedBy("lock")
483     void onTransactionAborted(final AbstractProxyTransaction tx) {
484         // No-op for most implementations
485     }
486
487     @GuardedBy("lock")
488     void onTransactionCompleted(final AbstractProxyTransaction tx) {
489         // No-op for most implementations
490     }
491
492     void onTransactionSealed(final AbstractProxyTransaction tx) {
493         // No-op on most implementations
494     }
495 }