4764f24d495991f0082526fef2b569a62b828835
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Per-connection representation of a local history. This class handles state replication across a single connection.
47  *
48  * @author Robert Varga
49  */
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51     private abstract static class AbstractLocal extends ProxyHistory {
52         private final DataTree dataTree;
53
54         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56             super(parent, connection, identifier);
57             this.dataTree = Preconditions.checkNotNull(dataTree);
58         }
59
60         final DataTreeSnapshot takeSnapshot() {
61             return dataTree.takeSnapshot();
62         }
63     }
64
65     private abstract static class AbstractRemote extends ProxyHistory {
66         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67             final LocalHistoryIdentifier identifier) {
68             super(parent, connection, identifier);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalReadWriteProxyTransaction lastOpen;
79
80         private volatile LocalReadWriteProxyTransaction lastSealed;
81
82         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84             super(parent, connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId, final boolean snapshotOnly) {
90             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91
92             // onTransactionCompleted() runs concurrently
93             final LocalReadWriteProxyTransaction localSealed = lastSealed;
94             final DataTreeSnapshot baseSnapshot;
95             if (localSealed != null) {
96                 baseSnapshot = localSealed.getSnapshot();
97             } else {
98                 baseSnapshot = takeSnapshot();
99             }
100
101             if (snapshotOnly) {
102                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
103             }
104
105             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
106             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
107             return lastOpen;
108         }
109
110         @Override
111         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
112             return createClient(parent(), connection, getIdentifier());
113         }
114
115         @Override
116         void onTransactionAborted(final AbstractProxyTransaction tx) {
117             if (tx.equals(lastOpen)) {
118                 lastOpen = null;
119             }
120         }
121
122         @Override
123         void onTransactionCompleted(final AbstractProxyTransaction tx) {
124             Verify.verify(tx instanceof LocalProxyTransaction);
125             if (tx instanceof LocalReadWriteProxyTransaction) {
126                 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
127                     LOG.debug("Completed last sealed transaction {}", tx);
128                 }
129             }
130         }
131
132         @Override
133         void onTransactionSealed(final AbstractProxyTransaction tx) {
134             Preconditions.checkState(tx.equals(lastOpen));
135             lastSealed = lastOpen;
136             lastOpen = null;
137         }
138     }
139
140     private static final class LocalSingle extends AbstractLocal {
141         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
142             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
143             super(parent, connection, identifier, dataTree);
144         }
145
146         @Override
147         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
148                 final TransactionIdentifier txId, final boolean snapshotOnly) {
149             final DataTreeSnapshot snapshot = takeSnapshot();
150             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
151                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
152         }
153
154         @Override
155         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
156             return createSingle(parent(), connection, getIdentifier());
157         }
158     }
159
160     private static final class Remote extends AbstractRemote {
161         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
162             final LocalHistoryIdentifier identifier) {
163             super(parent, connection, identifier);
164         }
165
166         @Override
167         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
168                 final TransactionIdentifier txId, final boolean snapshotOnly) {
169             return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
170         }
171
172         @Override
173         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
174             return createClient(parent(), connection, getIdentifier());
175         }
176     }
177
178     private static final class RemoteSingle extends AbstractRemote {
179         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
180             final LocalHistoryIdentifier identifier) {
181             super(parent, connection, identifier);
182         }
183
184         @Override
185         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
186                 final TransactionIdentifier txId, final boolean snapshotOnly) {
187             return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
188         }
189
190         @Override
191         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
192             return createSingle(parent(), connection, getIdentifier());
193         }
194     }
195
196     private static final class RequestReplayException extends RequestException {
197         private static final long serialVersionUID = 1L;
198
199         RequestReplayException(final String format, final Object... args) {
200             super(String.format(format, args));
201         }
202
203         @Override
204         public boolean isRetriable() {
205             return false;
206         }
207     }
208
209     private final class ReconnectCohort extends ProxyReconnectCohort {
210         @Override
211         public LocalHistoryIdentifier getIdentifier() {
212             return identifier;
213         }
214
215         @GuardedBy("lock")
216         @Override
217         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
218             // First look for our Create message
219             Iterator<ConnectionEntry> it = previousEntries.iterator();
220             while (it.hasNext()) {
221                 final ConnectionEntry e = it.next();
222                 final Request<?, ?> req = e.getRequest();
223                 if (identifier.equals(req.getTarget())) {
224                     Verify.verify(req instanceof LocalHistoryRequest);
225                     if (req instanceof CreateLocalHistoryRequest) {
226                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
227                         it.remove();
228                         break;
229                     }
230                 }
231             }
232
233             for (AbstractProxyTransaction t : proxies.values()) {
234                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
235                 t.replayMessages(successor, previousEntries);
236             }
237
238             // Now look for any finalizing messages
239             it = previousEntries.iterator();
240             while (it.hasNext()) {
241                 final ConnectionEntry e  = it.next();
242                 final Request<?, ?> req = e.getRequest();
243                 if (identifier.equals(req.getTarget())) {
244                     Verify.verify(req instanceof LocalHistoryRequest);
245                     if (req instanceof DestroyLocalHistoryRequest) {
246                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
247                         it.remove();
248                         break;
249                     }
250                 }
251             }
252         }
253
254         @GuardedBy("lock")
255         @Override
256         ProxyHistory finishReconnect() {
257             final ProxyHistory ret = Verify.verifyNotNull(successor);
258
259             for (AbstractProxyTransaction t : proxies.values()) {
260                 t.finishReconnect();
261             }
262
263             LOG.debug("Finished reconnecting proxy history {}", this);
264             lock.unlock();
265             return ret;
266         }
267
268         @Override
269         void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
270                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
271             // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
272             //        period required to get into the queue.
273             if (request instanceof TransactionRequest) {
274                 forwardTransactionRequest((TransactionRequest<?>) request, callback);
275             } else if (request instanceof LocalHistoryRequest) {
276                 forwardTo.accept(request, callback);
277             } else {
278                 throw new IllegalArgumentException("Unhandled request " + request);
279             }
280         }
281
282         private void forwardTransactionRequest(final TransactionRequest<?> request,
283                 final Consumer<Response<?, ?>> callback) throws RequestException {
284
285             final AbstractProxyTransaction proxy;
286             lock.lock();
287             try {
288                 proxy = proxies.get(request.getTarget());
289             } finally {
290                 lock.unlock();
291             }
292             if (proxy == null) {
293                 throw new RequestReplayException("Failed to find proxy for %s", request);
294             }
295
296             proxy.forwardRequest(request, callback);
297         }
298     }
299
300     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
301
302     private final Lock lock = new ReentrantLock();
303     private final LocalHistoryIdentifier identifier;
304     private final AbstractClientConnection<ShardBackendInfo> connection;
305     private final AbstractClientHistory parent;
306
307     @GuardedBy("lock")
308     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
309     @GuardedBy("lock")
310     private ProxyHistory successor;
311
312     private ProxyHistory(final AbstractClientHistory parent,
313             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
314         this.parent = Preconditions.checkNotNull(parent);
315         this.connection = Preconditions.checkNotNull(connection);
316         this.identifier = Preconditions.checkNotNull(identifier);
317     }
318
319     static ProxyHistory createClient(final AbstractClientHistory parent,
320             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
321         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
322         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
323              : new Remote(parent, connection, identifier);
324     }
325
326     static ProxyHistory createSingle(final AbstractClientHistory parent,
327             final AbstractClientConnection<ShardBackendInfo> connection,
328             final LocalHistoryIdentifier identifier) {
329         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
330         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
331              : new RemoteSingle(parent, connection, identifier);
332     }
333
334     @Override
335     public LocalHistoryIdentifier getIdentifier() {
336         return identifier;
337     }
338
339     final ClientActorContext context() {
340         return connection.context();
341     }
342
343     final long currentTime() {
344         return connection.currentTime();
345     }
346
347     final ActorRef localActor() {
348         return connection.localActor();
349     }
350
351     final AbstractClientHistory parent() {
352         return parent;
353     }
354
355     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
356             final boolean snapshotOnly) {
357         lock.lock();
358         try {
359             if (successor != null) {
360                 return successor.createTransactionProxy(txId, snapshotOnly);
361             }
362
363             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
364             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
365             proxies.put(proxyId, ret);
366             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
367             return ret;
368         } finally {
369             lock.unlock();
370         }
371     }
372
373     final void abortTransaction(final AbstractProxyTransaction tx) {
374         lock.lock();
375         try {
376             // Removal will be completed once purge completes
377             LOG.debug("Proxy {} aborted transaction {}", this, tx);
378             onTransactionAborted(tx);
379         } finally {
380             lock.unlock();
381         }
382     }
383
384     final void completeTransaction(final AbstractProxyTransaction tx) {
385         lock.lock();
386         try {
387             proxies.remove(tx.getIdentifier());
388             LOG.debug("Proxy {} completing transaction {}", this, tx);
389             onTransactionCompleted(tx);
390         } finally {
391             lock.unlock();
392         }
393     }
394
395     final void close() {
396         lock.lock();
397         try {
398             if (successor != null) {
399                 successor.close();
400                 return;
401             }
402
403             LOG.debug("Proxy {} invoking destroy", this);
404             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
405                 this::onDestroyComplete);
406         } finally {
407             lock.unlock();
408         }
409     }
410
411     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
412             final long enqueuedTicks) {
413         connection.enqueueRequest(request, callback, enqueuedTicks);
414     }
415
416     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
417         connection.sendRequest(request, callback);
418     }
419
420     @GuardedBy("lock")
421     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
422             TransactionIdentifier txId, boolean snapshotOnly);
423
424     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
425
426     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
427     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
428         lock.lock();
429         if (successor != null) {
430             lock.unlock();
431             throw new IllegalStateException("Proxy history " + this + " already has a successor");
432         }
433
434         successor = createSuccessor(newConnection);
435         LOG.debug("History {} instantiated successor {}", this, successor);
436
437         for (AbstractProxyTransaction t : proxies.values()) {
438             t.startReconnect();
439         }
440
441         return new ReconnectCohort();
442     }
443
444     private void onDestroyComplete(final Response<?, ?> response) {
445         LOG.debug("Proxy {} destroy completed with {}", this, response);
446
447         lock.lock();
448         try {
449             parent.onProxyDestroyed(this);
450             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
451                 this::onPurgeComplete);
452         } finally {
453             lock.unlock();
454         }
455     }
456
457     private void onPurgeComplete(final Response<?, ?> response) {
458         LOG.debug("Proxy {} purge completed with {}", this, response);
459     }
460
461     @GuardedBy("lock")
462     void onTransactionAborted(final AbstractProxyTransaction tx) {
463         // No-op for most implementations
464     }
465
466     @GuardedBy("lock")
467     void onTransactionCompleted(final AbstractProxyTransaction tx) {
468         // No-op for most implementations
469     }
470
471     void onTransactionSealed(final AbstractProxyTransaction tx) {
472         // No-op on most implementations
473     }
474 }