Add UnsignedLongBitmap
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSelection;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.MemberName;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
24 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.util.Try;
32
33 /**
34  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
35  * transaction factories.
36  */
37 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
39     @SuppressWarnings("rawtypes")
40     private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
41             AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
42
43     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
44     private final LocalHistoryIdentifier historyId;
45     private final ActorUtils actorUtils;
46
47     // Used via TX_COUNTER_UPDATER
48     @SuppressWarnings("unused")
49     private volatile long nextTx;
50
51     protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
52         this.actorUtils = requireNonNull(actorUtils);
53         this.historyId = requireNonNull(historyId);
54     }
55
56     final ActorUtils getActorUtils() {
57         return actorUtils;
58     }
59
60     final LocalHistoryIdentifier getHistoryId() {
61         return historyId;
62     }
63
64     @SuppressWarnings("checkstyle:IllegalCatch")
65     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
66             final String shardName) {
67         final LocalTransactionFactory local = knownLocal.get(shardName);
68         if (local != null) {
69             LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
70                 shardName, local);
71
72             try {
73                 return createLocalTransactionContext(local, parent);
74             } catch (Exception e) {
75                 return new NoOpTransactionContext(e, parent.getIdentifier());
76             }
77         }
78
79         return null;
80     }
81
82     private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
83             final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
84             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
85         LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
86                 parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
87
88         updateShardInfo(shardName, primaryShardInfo);
89
90         final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
91         try {
92             if (localContext != null) {
93                 LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
94                         parent.getIdentifier());
95                 return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
96                         localContext);
97             } else {
98                 LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
99                         parent.getIdentifier());
100                 final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
101                         transactionContextWrapper, parent, shardName);
102                 remote.setPrimaryShard(primaryShardInfo);
103                 return transactionContextWrapper;
104             }
105         } finally {
106             onTransactionContextCreated(parent.getIdentifier());
107         }
108     }
109
110     private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
111             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
112         LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
113                 primaryShardInfo.getPrimaryShardActor(), shardName);
114
115         updateShardInfo(shardName, primaryShardInfo);
116
117         final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
118         try {
119             if (localContext != null) {
120                 transactionContextWrapper.executePriorTransactionOperations(localContext);
121             } else {
122                 final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
123                         transactionContextWrapper, parent, shardName);
124                 remote.setPrimaryShard(primaryShardInfo);
125             }
126         } finally {
127             onTransactionContextCreated(parent.getIdentifier());
128         }
129     }
130
131     private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
132             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
133         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
134
135         try {
136             transactionContextWrapper.executePriorTransactionOperations(
137                     new NoOpTransactionContext(failure, parent.getIdentifier()));
138         } finally {
139             onTransactionContextCreated(parent.getIdentifier());
140         }
141     }
142
143     final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
144             final String shardName) {
145         final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
146                 parent.getIdentifier(), actorUtils, shardName);
147         final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
148         if (findPrimaryFuture.isCompleted()) {
149             final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
150             if (maybe.isSuccess()) {
151                 return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName,
152                         contextWrapper);
153             } else {
154                 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName,
155                         contextWrapper);
156             }
157         } else {
158             findPrimaryFuture.onComplete(result -> {
159                 if (result.isSuccess()) {
160                     onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
161                 } else {
162                     onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
163                 }
164                 return null;
165             }, actorUtils.getClientDispatcher());
166         }
167         return contextWrapper;
168     }
169
170     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
171         final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
172         if (maybeDataTree.isPresent()) {
173             if (!knownLocal.containsKey(shardName)) {
174                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
175
176                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
177                 knownLocal.putIfAbsent(shardName, factory);
178             }
179         } else if (knownLocal.containsKey(shardName)) {
180             LOG.debug("Shard {} invalidating local data tree", shardName);
181
182             knownLocal.remove(shardName);
183         }
184     }
185
186     protected final MemberName getMemberName() {
187         return historyId.getClientId().getFrontendId().getMemberName();
188     }
189
190     /**
191      * Create an identifier for the next TransactionProxy attached to this component
192      * factory.
193      * @return Transaction identifier, may not be null.
194      */
195     protected final TransactionIdentifier nextIdentifier() {
196         return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
197     }
198
199     /**
200      * Find the primary shard actor.
201      *
202      * @param shardName Shard name
203      * @return Future containing shard information.
204      */
205     protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
206             @NonNull TransactionIdentifier txId);
207
208     /**
209      * Create local transaction factory for specified shard, backed by specified shard leader
210      * and data tree instance.
211      *
212      * @param shardName the shard name
213      * @param shardLeader the shard leader
214      * @param dataTree Backing data tree instance. The data tree may only be accessed in
215      *                 read-only manner.
216      * @return Transaction factory for local use.
217      */
218     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
219
220     /**
221      * Callback invoked from child transactions to push any futures, which need to
222      * be waited for before the next transaction is allocated.
223      * @param cohortFutures Collection of futures
224      */
225     protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
226             @NonNull Collection<Future<T>> cohortFutures);
227
228     /**
229      * Callback invoked when the internal TransactionContext has been created for a transaction.
230      *
231      * @param transactionId the ID of the transaction.
232      */
233     protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
234
235     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
236                                                                     final TransactionProxy parent) {
237
238         switch (parent.getType()) {
239             case READ_ONLY:
240                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
241                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
242                     @Override
243                     DOMStoreWriteTransaction getWriteDelegate() {
244                         throw new UnsupportedOperationException();
245                     }
246
247                     @Override
248                     DOMStoreReadTransaction getReadDelegate() {
249                         return readOnly;
250                     }
251                 };
252             case READ_WRITE:
253                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
254                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
255                     @Override
256                     DOMStoreWriteTransaction getWriteDelegate() {
257                         return readWrite;
258                     }
259
260                     @Override
261                     DOMStoreReadTransaction getReadDelegate() {
262                         return readWrite;
263                     }
264                 };
265             case WRITE_ONLY:
266                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
267                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
268                     @Override
269                     DOMStoreWriteTransaction getWriteDelegate() {
270                         return writeOnly;
271                     }
272
273                     @Override
274                     DOMStoreReadTransaction getReadDelegate() {
275                         throw new UnsupportedOperationException();
276                     }
277                 };
278             default:
279                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
280         }
281     }
282 }