Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorSelection;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import org.eclipse.jdt.annotation.NonNull;
19 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.MemberName;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
24 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.util.Try;
32
33 /**
34  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
35  * transaction factories.
36  */
37 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
39     @SuppressWarnings("rawtypes")
40     private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
41             AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
42
43     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
44     private final @NonNull LocalHistoryIdentifier historyId;
45     private final @NonNull ActorUtils actorUtils;
46
47     // Used via TX_COUNTER_UPDATER
48     @SuppressWarnings("unused")
49     private volatile long nextTx;
50
51     protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
52         this.actorUtils = requireNonNull(actorUtils);
53         this.historyId = requireNonNull(historyId);
54     }
55
56     final ActorUtils getActorUtils() {
57         return actorUtils;
58     }
59
60     final LocalHistoryIdentifier getHistoryId() {
61         return historyId;
62     }
63
64     @SuppressWarnings("checkstyle:IllegalCatch")
65     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
66             final String shardName) {
67         final LocalTransactionFactory local = knownLocal.get(shardName);
68         if (local != null) {
69             LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
70                 shardName, local);
71
72             try {
73                 return createLocalTransactionContext(local, parent);
74             } catch (Exception e) {
75                 return new NoOpTransactionContext(e, parent.getIdentifier());
76             }
77         }
78
79         return null;
80     }
81
82     private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
83             final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
84             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
85         LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
86                 parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
87
88         updateShardInfo(shardName, primaryShardInfo);
89
90         final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
91         try {
92             if (localContext != null) {
93                 LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
94                         parent.getIdentifier());
95                 return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
96                         localContext);
97             }
98
99             LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
100                 parent.getIdentifier());
101             final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
102                 transactionContextWrapper, parent, shardName);
103             remote.setPrimaryShard(primaryShardInfo);
104             return transactionContextWrapper;
105         } finally {
106             onTransactionContextCreated(parent.getIdentifier());
107         }
108     }
109
110     private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
111             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
112         LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
113                 primaryShardInfo.getPrimaryShardActor(), shardName);
114
115         updateShardInfo(shardName, primaryShardInfo);
116
117         final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
118         try {
119             if (localContext != null) {
120                 transactionContextWrapper.executePriorTransactionOperations(localContext);
121             } else {
122                 final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
123                         transactionContextWrapper, parent, shardName);
124                 remote.setPrimaryShard(primaryShardInfo);
125             }
126         } finally {
127             onTransactionContextCreated(parent.getIdentifier());
128         }
129     }
130
131     private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
132             final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
133         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
134
135         try {
136             transactionContextWrapper.executePriorTransactionOperations(
137                     new NoOpTransactionContext(failure, parent.getIdentifier()));
138         } finally {
139             onTransactionContextCreated(parent.getIdentifier());
140         }
141     }
142
143     final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
144             final String shardName) {
145         final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
146                 parent.getIdentifier(), actorUtils, shardName);
147         final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
148         if (findPrimaryFuture.isCompleted()) {
149             final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
150             if (maybe.isSuccess()) {
151                 return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
152             }
153
154             onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
155         } else {
156             findPrimaryFuture.onComplete(result -> {
157                 if (result.isSuccess()) {
158                     onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
159                 } else {
160                     onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
161                 }
162                 return null;
163             }, actorUtils.getClientDispatcher());
164         }
165         return contextWrapper;
166     }
167
168     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
169         final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
170         if (maybeDataTree.isPresent()) {
171             if (!knownLocal.containsKey(shardName)) {
172                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
173
174                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
175                 knownLocal.putIfAbsent(shardName, factory);
176             }
177         } else if (knownLocal.containsKey(shardName)) {
178             LOG.debug("Shard {} invalidating local data tree", shardName);
179
180             knownLocal.remove(shardName);
181         }
182     }
183
184     protected final MemberName getMemberName() {
185         return historyId.getClientId().getFrontendId().getMemberName();
186     }
187
188     /**
189      * Create an identifier for the next TransactionProxy attached to this component
190      * factory.
191      * @return Transaction identifier, may not be null.
192      */
193     protected final TransactionIdentifier nextIdentifier() {
194         return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
195     }
196
197     /**
198      * Find the primary shard actor.
199      *
200      * @param shardName Shard name
201      * @return Future containing shard information.
202      */
203     protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
204             @NonNull TransactionIdentifier txId);
205
206     /**
207      * Create local transaction factory for specified shard, backed by specified shard leader
208      * and data tree instance.
209      *
210      * @param shardName the shard name
211      * @param shardLeader the shard leader
212      * @param dataTree Backing data tree instance. The data tree may only be accessed in
213      *                 read-only manner.
214      * @return Transaction factory for local use.
215      */
216     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
217
218     /**
219      * Callback invoked from child transactions to push any futures, which need to
220      * be waited for before the next transaction is allocated.
221      * @param cohortFutures Collection of futures
222      */
223     protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
224             @NonNull Collection<Future<T>> cohortFutures);
225
226     /**
227      * Callback invoked when the internal TransactionContext has been created for a transaction.
228      *
229      * @param transactionId the ID of the transaction.
230      */
231     protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
232
233     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
234                                                                     final TransactionProxy parent) {
235
236         switch (parent.getType()) {
237             case READ_ONLY:
238                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
239                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
240                     @Override
241                     DOMStoreWriteTransaction getWriteDelegate() {
242                         throw new UnsupportedOperationException();
243                     }
244
245                     @Override
246                     DOMStoreReadTransaction getReadDelegate() {
247                         return readOnly;
248                     }
249                 };
250             case READ_WRITE:
251                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
252                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
253                     @Override
254                     DOMStoreWriteTransaction getWriteDelegate() {
255                         return readWrite;
256                     }
257
258                     @Override
259                     DOMStoreReadTransaction getReadDelegate() {
260                         return readWrite;
261                     }
262                 };
263             case WRITE_ONLY:
264                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
265                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
266                     @Override
267                     DOMStoreWriteTransaction getWriteDelegate() {
268                         return writeOnly;
269                     }
270
271                     @Override
272                     DOMStoreReadTransaction getReadDelegate() {
273                         throw new UnsupportedOperationException();
274                     }
275                 };
276             default:
277                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
278         }
279     }
280 }