BUG-5280: use MemberName instead of String
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.access.concepts.MemberName;
20 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
21 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
22 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
26 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Future;
30 import scala.util.Try;
31
32 /**
33  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
34  * transaction factories.
35  */
36 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
37     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
38     private static final MemberName UNKNOWN_MEMBER = MemberName.forName("UNKNOWN-MEMBER");
39
40     protected static final AtomicLong TX_COUNTER = new AtomicLong();
41
42     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
43     private final ActorContext actorContext;
44
45     protected AbstractTransactionContextFactory(final ActorContext actorContext) {
46         this.actorContext = Preconditions.checkNotNull(actorContext);
47     }
48
49     final ActorContext getActorContext() {
50         return actorContext;
51     }
52
53     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
54         final LocalTransactionFactory local = knownLocal.get(shardName);
55         if (local != null) {
56             if(LOG.isDebugEnabled()) {
57                 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
58                         parent.getIdentifier(), shardName, local);
59             }
60
61             try {
62                 return createLocalTransactionContext(local, parent);
63             } catch(Exception e) {
64                 return new NoOpTransactionContext(e, parent.getIdentifier());
65             }
66         }
67
68         return null;
69     }
70
71     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
72             String shardName, TransactionContextWrapper transactionContextWrapper) {
73         if(LOG.isDebugEnabled()) {
74             LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
75                     primaryShardInfo.getPrimaryShardActor(), shardName);
76         }
77
78         updateShardInfo(shardName, primaryShardInfo);
79
80         try {
81             TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
82             if(localContext != null) {
83                 transactionContextWrapper.executePriorTransactionOperations(localContext);
84             } else {
85                 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
86                         parent, shardName);
87                 remote.setPrimaryShard(primaryShardInfo);
88             }
89         } finally {
90             onTransactionContextCreated(parent.getIdentifier());
91         }
92     }
93
94     private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
95             String shardName, TransactionContextWrapper transactionContextWrapper) {
96         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
97
98         try {
99             transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
100                     parent.getIdentifier()));
101         } finally {
102             onTransactionContextCreated(parent.getIdentifier());
103         }
104     }
105
106     final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
107         final TransactionContextWrapper transactionContextWrapper =
108                 new TransactionContextWrapper(parent.getIdentifier(), actorContext);
109
110         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
111         if(findPrimaryFuture.isCompleted()) {
112             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
113             if(maybe.isSuccess()) {
114                 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
115             } else {
116                 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
117             }
118         } else {
119             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
120                 @Override
121                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
122                     if (failure == null) {
123                         onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
124                     } else {
125                         onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
126                     }
127                 }
128             }, actorContext.getClientDispatcher());
129         }
130
131         return transactionContextWrapper;
132     }
133
134     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
135         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
136         if (maybeDataTree.isPresent()) {
137             if(!knownLocal.containsKey(shardName)) {
138                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
139
140                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
141                 knownLocal.putIfAbsent(shardName, factory);
142             }
143         } else if(knownLocal.containsKey(shardName)) {
144             LOG.debug("Shard {} invalidating local data tree", shardName);
145
146             knownLocal.remove(shardName);
147         }
148     }
149
150     protected MemberName getMemberName() {
151         final MemberName ret = getActorContext().getCurrentMemberName();
152         return ret == null ? UNKNOWN_MEMBER : ret;
153     }
154
155     /**
156      * Create an identifier for the next TransactionProxy attached to this component
157      * factory.
158      * @return Transaction identifier, may not be null.
159      */
160     protected abstract TransactionIdentifier nextIdentifier();
161
162     /**
163      * Find the primary shard actor.
164      *
165      * @param shardName Shard name
166      * @return Future containing shard information.
167      */
168     protected abstract Future<PrimaryShardInfo> findPrimaryShard(@Nonnull String shardName,
169             @Nonnull TransactionIdentifier txId);
170
171     /**
172      * Create local transaction factory for specified shard, backed by specified shard leader
173      * and data tree instance.
174      *
175      * @param shardName
176      * @param shardLeader
177      * @param dataTree Backing data tree instance. The data tree may only be accessed in
178      *                 read-only manner.
179      * @return Transaction factory for local use.
180      */
181     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
182
183     /**
184      * Callback invoked from child transactions to push any futures, which need to
185      * be waited for before the next transaction is allocated.
186      * @param cohortFutures Collection of futures
187      */
188     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
189
190     /**
191      * Callback invoked when the internal TransactionContext has been created for a transaction.
192      *
193      * @param transactionId the ID of the transaction.
194      */
195     protected abstract void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId);
196
197     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
198                                                                     final TransactionProxy parent) {
199
200         switch(parent.getType()) {
201             case READ_ONLY:
202                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
203                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
204                     @Override
205                     protected DOMStoreWriteTransaction getWriteDelegate() {
206                         throw new UnsupportedOperationException();
207                     }
208
209                     @Override
210                     protected DOMStoreReadTransaction getReadDelegate() {
211                         return readOnly;
212                     }
213                 };
214             case READ_WRITE:
215                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
216                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
217                     @Override
218                     protected DOMStoreWriteTransaction getWriteDelegate() {
219                         return readWrite;
220                     }
221
222                     @Override
223                     protected DOMStoreReadTransaction getReadDelegate() {
224                         return readWrite;
225                     }
226                 };
227             case WRITE_ONLY:
228                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
229                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
230                     @Override
231                     protected DOMStoreWriteTransaction getWriteDelegate() {
232                         return writeOnly;
233                     }
234
235                     @Override
236                     protected DOMStoreReadTransaction getReadDelegate() {
237                         throw new UnsupportedOperationException();
238                     }
239                 };
240              default:
241                  throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
242         }
243     }
244 }