Migrate users of Optional.get()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSelection;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.MemberName;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
24 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.util.Try;
32
33 /**
34  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
35  * transaction factories.
36  */
37 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
39     @SuppressWarnings("rawtypes")
40     private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
41             AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
42
43     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
44     private final @NonNull LocalHistoryIdentifier historyId;
45     private final @NonNull ActorUtils actorUtils;
46
47     // Used via TX_COUNTER_UPDATER
48     @SuppressWarnings("unused")
49     private volatile long nextTx;
50
51     protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
52         this.actorUtils = requireNonNull(actorUtils);
53         this.historyId = requireNonNull(historyId);
54     }
55
56     final ActorUtils getActorUtils() {
57         return actorUtils;
58     }
59
60     final LocalHistoryIdentifier getHistoryId() {
61         return historyId;
62     }
63
64     @SuppressWarnings("checkstyle:IllegalCatch")
65     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
66             final String shardName) {
67         final LocalTransactionFactory local = knownLocal.get(shardName);
68         if (local != null) {
69             LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
70                 shardName, local);
71
72             try {
73                 return createLocalTransactionContext(local, parent);
74             } catch (Exception e) {
75                 return new NoOpTransactionContext(e, parent.getIdentifier());
76             }
77         }
78
79         return null;
80     }
81
82     private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
83             final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
84             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
85         LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
86                 parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
87
88         updateShardInfo(shardName, primaryShardInfo);
89
90         final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
91         try {
92             if (localContext != null) {
93                 LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
94                         parent.getIdentifier());
95                 return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
96                         localContext);
97             }
98
99             LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
100                 parent.getIdentifier());
101             final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
102                 transactionContextWrapper, parent, shardName);
103             remote.setPrimaryShard(primaryShardInfo);
104             return transactionContextWrapper;
105         } finally {
106             onTransactionContextCreated(parent.getIdentifier());
107         }
108     }
109
110     private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
111             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
112         LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
113                 primaryShardInfo.getPrimaryShardActor(), shardName);
114
115         updateShardInfo(shardName, primaryShardInfo);
116
117         final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
118         try {
119             if (localContext != null) {
120                 transactionContextWrapper.executePriorTransactionOperations(localContext);
121             } else {
122                 final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
123                         transactionContextWrapper, parent, shardName);
124                 remote.setPrimaryShard(primaryShardInfo);
125             }
126         } finally {
127             onTransactionContextCreated(parent.getIdentifier());
128         }
129     }
130
131     private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
132             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
133         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
134
135         try {
136             transactionContextWrapper.executePriorTransactionOperations(
137                     new NoOpTransactionContext(failure, parent.getIdentifier()));
138         } finally {
139             onTransactionContextCreated(parent.getIdentifier());
140         }
141     }
142
143     final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
144             final String shardName) {
145         final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
146                 parent.getIdentifier(), actorUtils, shardName);
147         final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
148         if (findPrimaryFuture.isCompleted()) {
149             final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
150             if (maybe.isSuccess()) {
151                 return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
152             }
153
154             onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
155         } else {
156             findPrimaryFuture.onComplete(result -> {
157                 if (result.isSuccess()) {
158                     onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
159                 } else {
160                     onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
161                 }
162                 return null;
163             }, actorUtils.getClientDispatcher());
164         }
165         return contextWrapper;
166     }
167
168     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
169         final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
170         if (maybeDataTree.isPresent()) {
171             if (!knownLocal.containsKey(shardName)) {
172                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
173
174                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(),
175                     maybeDataTree.orElseThrow());
176                 knownLocal.putIfAbsent(shardName, factory);
177             }
178         } else if (knownLocal.containsKey(shardName)) {
179             LOG.debug("Shard {} invalidating local data tree", shardName);
180
181             knownLocal.remove(shardName);
182         }
183     }
184
185     protected final MemberName getMemberName() {
186         return historyId.getClientId().getFrontendId().getMemberName();
187     }
188
189     /**
190      * Create an identifier for the next TransactionProxy attached to this component
191      * factory.
192      * @return Transaction identifier, may not be null.
193      */
194     protected final TransactionIdentifier nextIdentifier() {
195         return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
196     }
197
198     /**
199      * Find the primary shard actor.
200      *
201      * @param shardName Shard name
202      * @return Future containing shard information.
203      */
204     protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
205             @NonNull TransactionIdentifier txId);
206
207     /**
208      * Create local transaction factory for specified shard, backed by specified shard leader
209      * and data tree instance.
210      *
211      * @param shardName the shard name
212      * @param shardLeader the shard leader
213      * @param dataTree Backing data tree instance. The data tree may only be accessed in
214      *                 read-only manner.
215      * @return Transaction factory for local use.
216      */
217     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
218
219     /**
220      * Callback invoked from child transactions to push any futures, which need to
221      * be waited for before the next transaction is allocated.
222      * @param cohortFutures Collection of futures
223      */
224     protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
225             @NonNull Collection<Future<T>> cohortFutures);
226
227     /**
228      * Callback invoked when the internal TransactionContext has been created for a transaction.
229      *
230      * @param transactionId the ID of the transaction.
231      */
232     protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
233
234     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
235                                                                     final TransactionProxy parent) {
236         return switch (parent.getType()) {
237             case READ_ONLY -> {
238                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
239                 yield new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
240                     @Override
241                     DOMStoreWriteTransaction getWriteDelegate() {
242                         throw new UnsupportedOperationException();
243                     }
244
245                     @Override
246                     DOMStoreReadTransaction getReadDelegate() {
247                         return readOnly;
248                     }
249                 };
250             }
251             case READ_WRITE -> {
252                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
253                 yield new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
254                     @Override
255                     DOMStoreWriteTransaction getWriteDelegate() {
256                         return readWrite;
257                     }
258
259                     @Override
260                     DOMStoreReadTransaction getReadDelegate() {
261                         return readWrite;
262                     }
263                 };
264             }
265             case WRITE_ONLY -> {
266                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
267                 yield new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
268                     @Override
269                     DOMStoreWriteTransaction getWriteDelegate() {
270                         return writeOnly;
271                     }
272
273                     @Override
274                     DOMStoreReadTransaction getReadDelegate() {
275                         throw new UnsupportedOperationException();
276                     }
277                 };
278             }
279             default -> throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
280         };
281     }
282 }