2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.BiConsumer;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
34 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestException;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.yangtools.concepts.Identifiable;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
41 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
46 * Per-connection representation of a local history. This class handles state replication across a single connection.
48 * @author Robert Varga
50 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
51 private abstract static class AbstractLocal extends ProxyHistory {
52 private final DataTree dataTree;
54 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
55 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
56 super(parent, connection, identifier);
57 this.dataTree = Preconditions.checkNotNull(dataTree);
60 final DataTreeSnapshot takeSnapshot() {
61 return dataTree.takeSnapshot();
65 private abstract static class AbstractRemote extends ProxyHistory {
66 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
67 final LocalHistoryIdentifier identifier) {
68 super(parent, connection, identifier);
72 private static final class Local extends AbstractLocal {
73 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
74 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
76 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77 // the open one and attempts to create a new transaction again.
78 private LocalReadWriteProxyTransaction lastOpen;
80 private volatile LocalReadWriteProxyTransaction lastSealed;
82 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
83 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
84 super(parent, connection, identifier, dataTree);
88 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89 final TransactionIdentifier txId, final boolean snapshotOnly) {
90 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
92 // onTransactionCompleted() runs concurrently
93 final LocalReadWriteProxyTransaction localSealed = lastSealed;
94 final DataTreeSnapshot baseSnapshot;
95 if (localSealed != null) {
96 baseSnapshot = localSealed.getSnapshot();
98 baseSnapshot = takeSnapshot();
102 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
105 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
106 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
111 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
112 return createClient(parent(), connection, getIdentifier());
116 void onTransactionAborted(final AbstractProxyTransaction tx) {
117 if (tx.equals(lastOpen)) {
123 void onTransactionCompleted(final AbstractProxyTransaction tx) {
124 Verify.verify(tx instanceof LocalProxyTransaction);
125 if (tx instanceof LocalReadWriteProxyTransaction) {
126 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
127 LOG.debug("Completed last sealed transaction {}", tx);
133 void onTransactionSealed(final AbstractProxyTransaction tx) {
134 Preconditions.checkState(tx.equals(lastOpen));
135 lastSealed = lastOpen;
140 private static final class LocalSingle extends AbstractLocal {
141 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
142 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
143 super(parent, connection, identifier, dataTree);
147 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
148 final TransactionIdentifier txId, final boolean snapshotOnly) {
149 final DataTreeSnapshot snapshot = takeSnapshot();
150 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
151 new LocalReadWriteProxyTransaction(this, txId, snapshot);
155 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
156 return createSingle(parent(), connection, getIdentifier());
160 private static final class Remote extends AbstractRemote {
161 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
162 final LocalHistoryIdentifier identifier) {
163 super(parent, connection, identifier);
167 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
168 final TransactionIdentifier txId, final boolean snapshotOnly) {
169 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
173 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
174 return createClient(parent(), connection, getIdentifier());
178 private static final class RemoteSingle extends AbstractRemote {
179 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
180 final LocalHistoryIdentifier identifier) {
181 super(parent, connection, identifier);
185 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
186 final TransactionIdentifier txId, final boolean snapshotOnly) {
187 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
191 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
192 return createSingle(parent(), connection, getIdentifier());
196 private static final class RequestReplayException extends RequestException {
197 private static final long serialVersionUID = 1L;
199 RequestReplayException(final String format, final Object... args) {
200 super(String.format(format, args));
204 public boolean isRetriable() {
209 private final class ReconnectCohort extends ProxyReconnectCohort {
211 public LocalHistoryIdentifier getIdentifier() {
217 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
218 // First look for our Create message
219 Iterator<ConnectionEntry> it = previousEntries.iterator();
220 while (it.hasNext()) {
221 final ConnectionEntry e = it.next();
222 final Request<?, ?> req = e.getRequest();
223 if (identifier.equals(req.getTarget())) {
224 Verify.verify(req instanceof LocalHistoryRequest);
225 if (req instanceof CreateLocalHistoryRequest) {
226 successor.connection.sendRequest(req, e.getCallback());
233 for (AbstractProxyTransaction t : proxies.values()) {
234 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
235 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
237 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
238 t.replayMessages(newProxy, previousEntries);
241 // Now look for any finalizing messages
242 it = previousEntries.iterator();
243 while (it.hasNext()) {
244 final ConnectionEntry e = it.next();
245 final Request<?, ?> req = e.getRequest();
246 if (identifier.equals(req.getTarget())) {
247 Verify.verify(req instanceof LocalHistoryRequest);
248 if (req instanceof DestroyLocalHistoryRequest) {
249 successor.connection.sendRequest(req, e.getCallback());
259 ProxyHistory finishReconnect() {
260 final ProxyHistory ret = Verify.verifyNotNull(successor);
262 for (AbstractProxyTransaction t : proxies.values()) {
266 LOG.debug("Finished reconnecting proxy history {}", this);
272 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
273 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
274 // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
275 // period required to get into the queue.
276 if (request instanceof TransactionRequest) {
277 forwardTransactionRequest((TransactionRequest<?>) request, callback);
278 } else if (request instanceof LocalHistoryRequest) {
279 forwardTo.accept(request, callback);
281 throw new IllegalArgumentException("Unhandled request " + request);
285 private void forwardTransactionRequest(final TransactionRequest<?> request,
286 final Consumer<Response<?, ?>> callback) throws RequestException {
288 final AbstractProxyTransaction proxy;
291 proxy = proxies.get(request.getTarget());
296 throw new RequestReplayException("Failed to find proxy for %s", request);
299 proxy.forwardRequest(request, callback);
303 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
305 private final Lock lock = new ReentrantLock();
306 private final LocalHistoryIdentifier identifier;
307 private final AbstractClientConnection<ShardBackendInfo> connection;
308 private final AbstractClientHistory parent;
311 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
313 private ProxyHistory successor;
315 private ProxyHistory(final AbstractClientHistory parent,
316 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
317 this.parent = Preconditions.checkNotNull(parent);
318 this.connection = Preconditions.checkNotNull(connection);
319 this.identifier = Preconditions.checkNotNull(identifier);
322 static ProxyHistory createClient(final AbstractClientHistory parent,
323 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
324 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
325 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
326 : new Remote(parent, connection, identifier);
329 static ProxyHistory createSingle(final AbstractClientHistory parent,
330 final AbstractClientConnection<ShardBackendInfo> connection,
331 final LocalHistoryIdentifier identifier) {
332 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
333 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
334 : new RemoteSingle(parent, connection, identifier);
338 public LocalHistoryIdentifier getIdentifier() {
342 final ClientActorContext context() {
343 return connection.context();
346 final long currentTime() {
347 return connection.currentTime();
350 final ActorRef localActor() {
351 return connection.localActor();
354 final AbstractClientHistory parent() {
358 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
359 final boolean snapshotOnly) {
362 if (successor != null) {
363 return successor.createTransactionProxy(txId, snapshotOnly);
366 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
367 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
368 proxies.put(proxyId, ret);
369 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
376 final void abortTransaction(final AbstractProxyTransaction tx) {
379 // Removal will be completed once purge completes
380 LOG.debug("Proxy {} aborted transaction {}", this, tx);
381 onTransactionAborted(tx);
387 final void completeTransaction(final AbstractProxyTransaction tx) {
390 proxies.remove(tx.getIdentifier());
391 LOG.debug("Proxy {} completing transaction {}", this, tx);
392 onTransactionCompleted(tx);
401 if (successor != null) {
406 LOG.debug("Proxy {} invoking destroy", this);
407 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
408 this::onDestroyComplete);
414 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
415 final long enqueuedTicks) {
416 connection.enqueueRequest(request, callback, enqueuedTicks);
419 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
420 connection.sendRequest(request, callback);
424 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
425 TransactionIdentifier txId, boolean snapshotOnly);
427 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
429 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
430 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
432 if (successor != null) {
434 throw new IllegalStateException("Proxy history " + this + " already has a successor");
437 successor = createSuccessor(newConnection);
438 LOG.debug("History {} instantiated successor {}", this, successor);
440 for (AbstractProxyTransaction t : proxies.values()) {
444 return new ReconnectCohort();
447 private void onDestroyComplete(final Response<?, ?> response) {
448 LOG.debug("Proxy {} destroy completed with {}", this, response);
452 parent.onProxyDestroyed(this);
453 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
454 this::onPurgeComplete);
460 private void onPurgeComplete(final Response<?, ?> response) {
461 LOG.debug("Proxy {} purge completed with {}", this, response);
465 void onTransactionAborted(final AbstractProxyTransaction tx) {
466 // No-op for most implementations
470 void onTransactionCompleted(final AbstractProxyTransaction tx) {
471 // No-op for most implementations
474 void onTransactionSealed(final AbstractProxyTransaction tx) {
475 // No-op on most implementations