Bug 7521: Convert byte[] to ShardManagerSnapshot in DatastoreSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
20 import org.opendaylight.controller.cluster.access.concepts.MemberName;
21 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
27 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Future;
31 import scala.util.Try;
32
33 /**
34  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
35  * transaction factories.
36  */
37 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
39     @SuppressWarnings("rawtypes")
40     private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
41             AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
42
43     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
44     private final LocalHistoryIdentifier historyId;
45     private final ActorContext actorContext;
46
47     // Used via TX_COUNTER_UPDATER
48     @SuppressWarnings("unused")
49     private volatile long nextTx;
50
51     protected AbstractTransactionContextFactory(final ActorContext actorContext,
52             final LocalHistoryIdentifier historyId) {
53         this.actorContext = Preconditions.checkNotNull(actorContext);
54         this.historyId = Preconditions.checkNotNull(historyId);
55     }
56
57     final ActorContext getActorContext() {
58         return actorContext;
59     }
60
61     final LocalHistoryIdentifier getHistoryId() {
62         return historyId;
63     }
64
65     @SuppressWarnings("checkstyle:IllegalCatch")
66     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
67             final String shardName) {
68         final LocalTransactionFactory local = knownLocal.get(shardName);
69         if (local != null) {
70             LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
71                 shardName, local);
72
73             try {
74                 return createLocalTransactionContext(local, parent);
75             } catch (Exception e) {
76                 return new NoOpTransactionContext(e, parent.getIdentifier());
77             }
78         }
79
80         return null;
81     }
82
83     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
84             String shardName, TransactionContextWrapper transactionContextWrapper) {
85         LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
86                 primaryShardInfo.getPrimaryShardActor(), shardName);
87
88         updateShardInfo(shardName, primaryShardInfo);
89
90         try {
91             TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
92             if (localContext != null) {
93                 transactionContextWrapper.executePriorTransactionOperations(localContext);
94             } else {
95                 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
96                         parent, shardName);
97                 remote.setPrimaryShard(primaryShardInfo);
98             }
99         } finally {
100             onTransactionContextCreated(parent.getIdentifier());
101         }
102     }
103
104     private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
105             String shardName, TransactionContextWrapper transactionContextWrapper) {
106         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
107
108         try {
109             transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
110                     parent.getIdentifier()));
111         } finally {
112             onTransactionContextCreated(parent.getIdentifier());
113         }
114     }
115
116     final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
117             final String shardName) {
118         final TransactionContextWrapper transactionContextWrapper =
119                 new TransactionContextWrapper(parent.getIdentifier(), actorContext);
120
121         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
122         if (findPrimaryFuture.isCompleted()) {
123             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
124             if (maybe.isSuccess()) {
125                 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
126             } else {
127                 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
128             }
129         } else {
130             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
131                 @Override
132                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
133                     if (failure == null) {
134                         onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
135                     } else {
136                         onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
137                     }
138                 }
139             }, actorContext.getClientDispatcher());
140         }
141
142         return transactionContextWrapper;
143     }
144
145     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
146         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
147         if (maybeDataTree.isPresent()) {
148             if (!knownLocal.containsKey(shardName)) {
149                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
150
151                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
152                 knownLocal.putIfAbsent(shardName, factory);
153             }
154         } else if (knownLocal.containsKey(shardName)) {
155             LOG.debug("Shard {} invalidating local data tree", shardName);
156
157             knownLocal.remove(shardName);
158         }
159     }
160
161     protected final MemberName getMemberName() {
162         return historyId.getClientId().getFrontendId().getMemberName();
163     }
164
165     /**
166      * Create an identifier for the next TransactionProxy attached to this component
167      * factory.
168      * @return Transaction identifier, may not be null.
169      */
170     protected final TransactionIdentifier nextIdentifier() {
171         return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
172     }
173
174     /**
175      * Find the primary shard actor.
176      *
177      * @param shardName Shard name
178      * @return Future containing shard information.
179      */
180     protected abstract Future<PrimaryShardInfo> findPrimaryShard(@Nonnull String shardName,
181             @Nonnull TransactionIdentifier txId);
182
183     /**
184      * Create local transaction factory for specified shard, backed by specified shard leader
185      * and data tree instance.
186      *
187      * @param shardName the shard name
188      * @param shardLeader the shard leader
189      * @param dataTree Backing data tree instance. The data tree may only be accessed in
190      *                 read-only manner.
191      * @return Transaction factory for local use.
192      */
193     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
194
195     /**
196      * Callback invoked from child transactions to push any futures, which need to
197      * be waited for before the next transaction is allocated.
198      * @param cohortFutures Collection of futures
199      */
200     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction,
201             @Nonnull Collection<Future<T>> cohortFutures);
202
203     /**
204      * Callback invoked when the internal TransactionContext has been created for a transaction.
205      *
206      * @param transactionId the ID of the transaction.
207      */
208     protected abstract void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId);
209
210     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
211                                                                     final TransactionProxy parent) {
212
213         switch (parent.getType()) {
214             case READ_ONLY:
215                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
216                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
217                     @Override
218                     protected DOMStoreWriteTransaction getWriteDelegate() {
219                         throw new UnsupportedOperationException();
220                     }
221
222                     @Override
223                     protected DOMStoreReadTransaction getReadDelegate() {
224                         return readOnly;
225                     }
226                 };
227             case READ_WRITE:
228                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
229                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
230                     @Override
231                     protected DOMStoreWriteTransaction getWriteDelegate() {
232                         return readWrite;
233                     }
234
235                     @Override
236                     protected DOMStoreReadTransaction getReadDelegate() {
237                         return readWrite;
238                     }
239                 };
240             case WRITE_ONLY:
241                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
242                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
243                     @Override
244                     protected DOMStoreWriteTransaction getWriteDelegate() {
245                         return writeOnly;
246                     }
247
248                     @Override
249                     protected DOMStoreReadTransaction getReadDelegate() {
250                         throw new UnsupportedOperationException();
251                     }
252                 };
253             default:
254                 throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
255         }
256     }
257 }