2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Per-connection representation of a local history. This class handles state replication across a single connection.
48 * @author Robert Varga
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51 private abstract static class AbstractLocal extends ProxyHistory {
52 private final DataTree dataTree;
54 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56 super(parent, connection, identifier);
57 this.dataTree = Preconditions.checkNotNull(dataTree);
60 final DataTreeSnapshot takeSnapshot() {
61 return dataTree.takeSnapshot();
65 private abstract static class AbstractRemote extends ProxyHistory {
66 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67 final LocalHistoryIdentifier identifier) {
68 super(parent, connection, identifier);
72 private static final class Local extends AbstractLocal {
73 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
76 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77 // the open one and attempts to create a new transaction again.
78 private LocalReadWriteProxyTransaction lastOpen;
80 private volatile LocalReadWriteProxyTransaction lastSealed;
82 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84 super(parent, connection, identifier, dataTree);
88 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
90 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
93 // Done transactions do not register on our radar on should not have any state associated.
94 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
95 : new LocalReadWriteProxyTransaction(this, txId);
98 // onTransactionCompleted() runs concurrently
99 final LocalReadWriteProxyTransaction localSealed = lastSealed;
100 final DataTreeSnapshot baseSnapshot;
101 if (localSealed != null) {
102 baseSnapshot = localSealed.getSnapshot();
104 baseSnapshot = takeSnapshot();
108 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
111 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
112 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
117 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
118 return createClient(parent(), connection, getIdentifier());
122 void onTransactionAborted(final AbstractProxyTransaction tx) {
123 if (tx.equals(lastOpen)) {
129 void onTransactionCompleted(final AbstractProxyTransaction tx) {
130 Verify.verify(tx instanceof LocalProxyTransaction);
131 if (tx instanceof LocalReadWriteProxyTransaction) {
132 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
133 LOG.debug("Completed last sealed transaction {}", tx);
139 void onTransactionSealed(final AbstractProxyTransaction tx) {
140 Preconditions.checkState(tx.equals(lastOpen));
141 lastSealed = lastOpen;
146 private static final class LocalSingle extends AbstractLocal {
147 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
148 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
149 super(parent, connection, identifier, dataTree);
153 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
154 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
155 final DataTreeSnapshot snapshot = takeSnapshot();
156 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
157 new LocalReadWriteProxyTransaction(this, txId, snapshot);
161 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
162 return createSingle(parent(), connection, getIdentifier());
166 private static final class Remote extends AbstractRemote {
167 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
168 final LocalHistoryIdentifier identifier) {
169 super(parent, connection, identifier);
173 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
174 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
175 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
179 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
180 return createClient(parent(), connection, getIdentifier());
184 private static final class RemoteSingle extends AbstractRemote {
185 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
186 final LocalHistoryIdentifier identifier) {
187 super(parent, connection, identifier);
191 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
192 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
193 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
197 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
198 return createSingle(parent(), connection, getIdentifier());
202 private static final class RequestReplayException extends RequestException {
203 private static final long serialVersionUID = 1L;
205 RequestReplayException(final String format, final Object... args) {
206 super(String.format(format, args));
210 public boolean isRetriable() {
215 private final class ReconnectCohort extends ProxyReconnectCohort {
217 public LocalHistoryIdentifier getIdentifier() {
223 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
224 // First look for our Create message
225 Iterator<ConnectionEntry> it = previousEntries.iterator();
226 while (it.hasNext()) {
227 final ConnectionEntry e = it.next();
228 final Request<?, ?> req = e.getRequest();
229 if (identifier.equals(req.getTarget())) {
230 Verify.verify(req instanceof LocalHistoryRequest);
231 if (req instanceof CreateLocalHistoryRequest) {
232 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
239 for (AbstractProxyTransaction t : proxies.values()) {
240 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
241 t.replayMessages(successor, previousEntries);
244 // Now look for any finalizing messages
245 it = previousEntries.iterator();
246 while (it.hasNext()) {
247 final ConnectionEntry e = it.next();
248 final Request<?, ?> req = e.getRequest();
249 if (identifier.equals(req.getTarget())) {
250 Verify.verify(req instanceof LocalHistoryRequest);
251 if (req instanceof DestroyLocalHistoryRequest) {
252 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
262 ProxyHistory finishReconnect() {
263 final ProxyHistory ret = Verify.verifyNotNull(successor);
265 for (AbstractProxyTransaction t : proxies.values()) {
269 LOG.debug("Finished reconnecting proxy history {}", this);
275 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
276 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
277 // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
278 // period required to get into the queue.
279 if (request instanceof TransactionRequest) {
280 forwardTransactionRequest((TransactionRequest<?>) request, callback);
281 } else if (request instanceof LocalHistoryRequest) {
282 forwardTo.accept(request, callback);
284 throw new IllegalArgumentException("Unhandled request " + request);
288 private void forwardTransactionRequest(final TransactionRequest<?> request,
289 final Consumer<Response<?, ?>> callback) throws RequestException {
291 final AbstractProxyTransaction proxy;
294 proxy = proxies.get(request.getTarget());
299 throw new RequestReplayException("Failed to find proxy for %s", request);
302 proxy.forwardRequest(request, callback);
306 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
308 private final Lock lock = new ReentrantLock();
309 private final LocalHistoryIdentifier identifier;
310 private final AbstractClientConnection<ShardBackendInfo> connection;
311 private final AbstractClientHistory parent;
314 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
316 private ProxyHistory successor;
318 private ProxyHistory(final AbstractClientHistory parent,
319 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
320 this.parent = Preconditions.checkNotNull(parent);
321 this.connection = Preconditions.checkNotNull(connection);
322 this.identifier = Preconditions.checkNotNull(identifier);
325 static ProxyHistory createClient(final AbstractClientHistory parent,
326 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
327 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
328 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
329 : new Remote(parent, connection, identifier);
332 static ProxyHistory createSingle(final AbstractClientHistory parent,
333 final AbstractClientConnection<ShardBackendInfo> connection,
334 final LocalHistoryIdentifier identifier) {
335 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
336 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
337 : new RemoteSingle(parent, connection, identifier);
341 public LocalHistoryIdentifier getIdentifier() {
345 final ClientActorContext context() {
346 return connection.context();
349 final long currentTime() {
350 return connection.currentTime();
353 final ActorRef localActor() {
354 return connection.localActor();
357 final AbstractClientHistory parent() {
361 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
362 final boolean snapshotOnly) {
363 return createTransactionProxy(txId, snapshotOnly, false);
366 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
367 final boolean isDone) {
370 if (successor != null) {
371 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
374 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
375 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
376 proxies.put(proxyId, ret);
377 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
384 final void abortTransaction(final AbstractProxyTransaction tx) {
387 // Removal will be completed once purge completes
388 LOG.debug("Proxy {} aborted transaction {}", this, tx);
389 onTransactionAborted(tx);
395 final void completeTransaction(final AbstractProxyTransaction tx) {
398 // Removal will be completed once purge completes
399 LOG.debug("Proxy {} completing transaction {}", this, tx);
400 onTransactionCompleted(tx);
406 void purgeTransaction(final AbstractProxyTransaction tx) {
409 proxies.remove(tx.getIdentifier());
410 LOG.debug("Proxy {} purged transaction {}", this, tx);
419 if (successor != null) {
424 LOG.debug("Proxy {} invoking destroy", this);
425 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
426 this::onDestroyComplete);
432 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
433 final long enqueuedTicks) {
434 connection.enqueueRequest(request, callback, enqueuedTicks);
437 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
438 connection.sendRequest(request, callback);
442 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
443 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
445 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
447 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
448 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
450 if (successor != null) {
452 throw new IllegalStateException("Proxy history " + this + " already has a successor");
455 successor = createSuccessor(newConnection);
456 LOG.debug("History {} instantiated successor {}", this, successor);
458 for (AbstractProxyTransaction t : proxies.values()) {
462 return new ReconnectCohort();
465 private void onDestroyComplete(final Response<?, ?> response) {
466 LOG.debug("Proxy {} destroy completed with {}", this, response);
470 parent.onProxyDestroyed(this);
471 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
472 this::onPurgeComplete);
478 private void onPurgeComplete(final Response<?, ?> response) {
479 LOG.debug("Proxy {} purge completed with {}", this, response);
483 void onTransactionAborted(final AbstractProxyTransaction tx) {
484 // No-op for most implementations
488 void onTransactionCompleted(final AbstractProxyTransaction tx) {
489 // No-op for most implementations
492 void onTransactionSealed(final AbstractProxyTransaction tx) {
493 // No-op on most implementations