2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
29 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
30 import org.opendaylight.controller.cluster.access.concepts.Request;
31 import org.opendaylight.controller.cluster.access.concepts.RequestException;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
36 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Per-connection representation of a local history. This class handles state replication across a single connection.
43 * @author Robert Varga
45 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
46 private abstract static class AbstractLocal extends ProxyHistory {
47 private final DataTree dataTree;
49 AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
50 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
51 super(connection, identifier);
52 this.dataTree = Preconditions.checkNotNull(dataTree);
55 final DataTreeSnapshot takeSnapshot() {
56 return dataTree.takeSnapshot();
60 private abstract static class AbstractRemote extends ProxyHistory {
61 AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
62 final LocalHistoryIdentifier identifier) {
63 super(connection, identifier);
67 private static final class Local extends AbstractLocal {
68 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
69 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
71 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
72 // the open one and attempts to create a new transaction again.
73 private LocalReadWriteProxyTransaction lastOpen;
75 private volatile LocalReadWriteProxyTransaction lastSealed;
77 Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
78 final DataTree dataTree) {
79 super(connection, identifier, dataTree);
83 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
84 final TransactionIdentifier txId, final boolean snapshotOnly) {
85 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
87 // onTransactionCompleted() runs concurrently
88 final LocalReadWriteProxyTransaction localSealed = lastSealed;
89 final DataTreeSnapshot baseSnapshot;
90 if (localSealed != null) {
91 baseSnapshot = localSealed.getSnapshot();
93 baseSnapshot = takeSnapshot();
97 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
100 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
101 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
106 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
107 return createClient(connection, getIdentifier());
111 void onTransactionAborted(final AbstractProxyTransaction tx) {
112 if (tx.equals(lastOpen)) {
118 void onTransactionCompleted(final AbstractProxyTransaction tx) {
119 Verify.verify(tx instanceof LocalProxyTransaction);
120 if (tx instanceof LocalReadWriteProxyTransaction) {
121 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
122 LOG.debug("Completed last sealed transaction {}", tx);
128 void onTransactionSealed(final AbstractProxyTransaction tx) {
129 Preconditions.checkState(tx.equals(lastOpen));
130 lastSealed = lastOpen;
135 private static final class LocalSingle extends AbstractLocal {
136 LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
137 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
138 super(connection, identifier, dataTree);
142 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
143 final TransactionIdentifier txId, final boolean snapshotOnly) {
144 final DataTreeSnapshot snapshot = takeSnapshot();
145 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
146 new LocalReadWriteProxyTransaction(this, txId, snapshot);
150 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
151 return createSingle(connection, getIdentifier());
155 private static final class Remote extends AbstractRemote {
156 Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
157 super(connection, identifier);
161 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
162 final TransactionIdentifier txId, final boolean snapshotOnly) {
163 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
167 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
168 return createClient(connection, getIdentifier());
172 private static final class RemoteSingle extends AbstractRemote {
173 RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
174 final LocalHistoryIdentifier identifier) {
175 super(connection, identifier);
179 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
180 final TransactionIdentifier txId, final boolean snapshotOnly) {
181 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
185 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
186 return createSingle(connection, getIdentifier());
190 private static final class RequestReplayException extends RequestException {
191 private static final long serialVersionUID = 1L;
193 RequestReplayException(final String format, final Object... args) {
194 super(String.format(format, args));
198 public boolean isRetriable() {
203 private final class ReconnectCohort extends ProxyReconnectCohort {
205 public LocalHistoryIdentifier getIdentifier() {
211 void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
212 // First look for our Create message
213 for (ConnectionEntry e : previousEntries) {
214 final Request<?, ?> req = e.getRequest();
215 if (identifier.equals(req.getTarget())) {
216 Verify.verify(req instanceof LocalHistoryRequest);
217 if (req instanceof CreateLocalHistoryRequest) {
218 successor.connection.sendRequest(req, e.getCallback());
224 for (AbstractProxyTransaction t : proxies.values()) {
225 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
226 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
228 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
229 t.replayMessages(newProxy, previousEntries);
232 // Now look for any finalizing messages
233 for (ConnectionEntry e : previousEntries) {
234 final Request<?, ?> req = e.getRequest();
235 if (identifier.equals(req.getTarget())) {
236 Verify.verify(req instanceof LocalHistoryRequest);
237 successor.connection.sendRequest(req, e.getCallback());
244 ProxyHistory finishReconnect() {
245 final ProxyHistory ret = Verify.verifyNotNull(successor);
247 for (AbstractProxyTransaction t : proxies.values()) {
251 LOG.debug("Finished reconnecting proxy history {}", this);
257 void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
258 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
259 if (request instanceof TransactionRequest) {
260 replayTransactionRequest((TransactionRequest<?>) request, callback);
261 } else if (request instanceof LocalHistoryRequest) {
262 replayTo.accept(request, callback);
264 throw new IllegalArgumentException("Unhandled request " + request);
268 private void replayTransactionRequest(final TransactionRequest<?> request,
269 final Consumer<Response<?, ?>> callback) throws RequestException {
271 final AbstractProxyTransaction proxy;
274 proxy = proxies.get(request.getTarget());
279 throw new RequestReplayException("Failed to find proxy for %s", request);
282 proxy.replayRequest(request, callback);
286 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
288 private final Lock lock = new ReentrantLock();
289 private final LocalHistoryIdentifier identifier;
290 private final AbstractClientConnection<ShardBackendInfo> connection;
293 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
295 private ProxyHistory successor;
297 private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
298 final LocalHistoryIdentifier identifier) {
299 this.connection = Preconditions.checkNotNull(connection);
300 this.identifier = Preconditions.checkNotNull(identifier);
303 static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
304 final LocalHistoryIdentifier identifier) {
305 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
306 return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
307 : new Remote(connection, identifier);
310 static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
311 final LocalHistoryIdentifier identifier) {
312 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
313 return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
314 : new RemoteSingle(connection, identifier);
318 public LocalHistoryIdentifier getIdentifier() {
322 final ActorRef localActor() {
323 return connection.localActor();
326 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
327 final boolean snapshotOnly) {
330 if (successor != null) {
331 return successor.createTransactionProxy(txId, snapshotOnly);
334 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
335 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
336 proxies.put(proxyId, ret);
337 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
344 final void abortTransaction(final AbstractProxyTransaction tx) {
347 proxies.remove(tx.getIdentifier());
348 LOG.debug("Proxy {} aborting transaction {}", this, tx);
349 onTransactionAborted(tx);
355 final void completeTransaction(final AbstractProxyTransaction tx) {
358 proxies.remove(tx.getIdentifier());
359 LOG.debug("Proxy {} completing transaction {}", this, tx);
360 onTransactionCompleted(tx);
366 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
367 connection.sendRequest(request, callback);
371 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
372 TransactionIdentifier txId, boolean snapshotOnly);
374 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
376 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
377 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
379 if (successor != null) {
381 throw new IllegalStateException("Proxy history " + this + " already has a successor");
384 successor = createSuccessor(newConnection);
385 LOG.debug("History {} instantiated successor {}", this, successor);
387 for (AbstractProxyTransaction t : proxies.values()) {
391 return new ReconnectCohort();
395 void onTransactionAborted(final AbstractProxyTransaction tx) {
396 // No-op for most implementations
400 void onTransactionCompleted(final AbstractProxyTransaction tx) {
401 // No-op for most implementations
404 void onTransactionSealed(final AbstractProxyTransaction tx) {
405 // No-op on most implementations