Use java.util.Optional in ShardLeaderStateChanged and PrimaryShardInfo
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractTransactionContextFactory.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import java.util.Collection;
14 import java.util.Optional;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.util.Try;
30
31 /**
32  * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
33  * transaction factories.
34  */
35 abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
37
38     protected static final AtomicLong TX_COUNTER = new AtomicLong();
39
40     private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
41     private final ActorContext actorContext;
42
43     protected AbstractTransactionContextFactory(final ActorContext actorContext) {
44         this.actorContext = Preconditions.checkNotNull(actorContext);
45     }
46
47     final ActorContext getActorContext() {
48         return actorContext;
49     }
50
51     private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
52         final LocalTransactionFactory local = knownLocal.get(shardName);
53         if (local != null) {
54             if(LOG.isDebugEnabled()) {
55                 LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
56                         parent.getIdentifier(), shardName, local);
57             }
58
59             try {
60                 return createLocalTransactionContext(local, parent);
61             } catch(Exception e) {
62                 return new NoOpTransactionContext(e, parent.getIdentifier());
63             }
64         }
65
66         return null;
67     }
68
69     private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
70             String shardName, TransactionContextWrapper transactionContextWrapper) {
71         if(LOG.isDebugEnabled()) {
72             LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
73                     primaryShardInfo.getPrimaryShardActor(), shardName);
74         }
75
76         updateShardInfo(shardName, primaryShardInfo);
77
78         try {
79             TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
80             if(localContext != null) {
81                 transactionContextWrapper.executePriorTransactionOperations(localContext);
82             } else {
83                 RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
84                         parent, shardName);
85                 remote.setPrimaryShard(primaryShardInfo);
86             }
87         } finally {
88             onTransactionContextCreated(parent.getIdentifier());
89         }
90     }
91
92     private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
93             String shardName, TransactionContextWrapper transactionContextWrapper) {
94         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
95
96         try {
97             transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
98                     parent.getIdentifier()));
99         } finally {
100             onTransactionContextCreated(parent.getIdentifier());
101         }
102     }
103
104     final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
105         final TransactionContextWrapper transactionContextWrapper =
106                 new TransactionContextWrapper(parent.getIdentifier(), actorContext);
107
108         Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
109         if(findPrimaryFuture.isCompleted()) {
110             Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
111             if(maybe.isSuccess()) {
112                 onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
113             } else {
114                 onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
115             }
116         } else {
117             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
118                 @Override
119                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
120                     if (failure == null) {
121                         onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
122                     } else {
123                         onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
124                     }
125                 }
126             }, actorContext.getClientDispatcher());
127         }
128
129         return transactionContextWrapper;
130     }
131
132     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
133         final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
134         if (maybeDataTree.isPresent()) {
135             if(!knownLocal.containsKey(shardName)) {
136                 LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
137
138                 F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
139                 knownLocal.putIfAbsent(shardName, factory);
140             }
141         } else if(knownLocal.containsKey(shardName)) {
142             LOG.debug("Shard {} invalidating local data tree", shardName);
143
144             knownLocal.remove(shardName);
145         }
146     }
147
148     protected String getMemberName() {
149         String memberName = getActorContext().getCurrentMemberName();
150         if (memberName == null) {
151             memberName = "UNKNOWN-MEMBER";
152         }
153
154         return memberName;
155     }
156
157     /**
158      * Create an identifier for the next TransactionProxy attached to this component
159      * factory.
160      * @return Transaction identifier, may not be null.
161      */
162     protected abstract TransactionIdentifier nextIdentifier();
163
164     /**
165      * Find the primary shard actor.
166      *
167      * @param shardName Shard name
168      * @return Future containing shard information.
169      */
170     protected abstract Future<PrimaryShardInfo> findPrimaryShard(@Nonnull String shardName,
171             @Nonnull TransactionIdentifier txId);
172
173     /**
174      * Create local transaction factory for specified shard, backed by specified shard leader
175      * and data tree instance.
176      *
177      * @param shardName
178      * @param shardLeader
179      * @param dataTree Backing data tree instance. The data tree may only be accessed in
180      *                 read-only manner.
181      * @return Transaction factory for local use.
182      */
183     protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
184
185     /**
186      * Callback invoked from child transactions to push any futures, which need to
187      * be waited for before the next transaction is allocated.
188      * @param cohortFutures Collection of futures
189      */
190     protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
191
192     /**
193      * Callback invoked when the internal TransactionContext has been created for a transaction.
194      *
195      * @param transactionId the ID of the transaction.
196      */
197     protected abstract void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId);
198
199     private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
200                                                                     final TransactionProxy parent) {
201
202         switch(parent.getType()) {
203             case READ_ONLY:
204                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
205                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
206                     @Override
207                     protected DOMStoreWriteTransaction getWriteDelegate() {
208                         throw new UnsupportedOperationException();
209                     }
210
211                     @Override
212                     protected DOMStoreReadTransaction getReadDelegate() {
213                         return readOnly;
214                     }
215                 };
216             case READ_WRITE:
217                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
218                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
219                     @Override
220                     protected DOMStoreWriteTransaction getWriteDelegate() {
221                         return readWrite;
222                     }
223
224                     @Override
225                     protected DOMStoreReadTransaction getReadDelegate() {
226                         return readWrite;
227                     }
228                 };
229             case WRITE_ONLY:
230                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
231                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
232                     @Override
233                     protected DOMStoreWriteTransaction getWriteDelegate() {
234                         return writeOnly;
235                     }
236
237                     @Override
238                     protected DOMStoreReadTransaction getReadDelegate() {
239                         throw new UnsupportedOperationException();
240                     }
241                 };
242              default:
243                  throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
244         }
245     }
246 }