2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import akka.persistence.RecoveryFailure;
19 import akka.serialization.Serialization;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.protobuf.ByteString;
28 import com.google.protobuf.InvalidProtocolBufferException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
32 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
39 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
44 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.controller.cluster.datastore.modification.Modification;
47 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.raft.ConfigParams;
50 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
51 import org.opendaylight.controller.cluster.raft.RaftActor;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
55 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
56 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
57 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
58 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
60 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
63 import org.opendaylight.yangtools.concepts.ListenerRegistration;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
65 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67 import scala.concurrent.duration.FiniteDuration;
69 import java.util.ArrayList;
70 import java.util.Date;
71 import java.util.HashMap;
72 import java.util.List;
74 import java.util.concurrent.ExecutionException;
75 import java.util.concurrent.TimeUnit;
78 * A Shard represents a portion of the logical data tree <br/>
80 * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
83 public class Shard extends RaftActor {
85 private static final ConfigParams configParams = new ShardConfigParams();
87 public static final String DEFAULT_NAME = "default";
89 // The state of this Shard
90 private final InMemoryDOMDataStore store;
92 private final Map<Object, DOMStoreThreePhaseCommitCohort>
93 modificationToCohort = new HashMap<>();
95 private final LoggingAdapter LOG =
96 Logging.getLogger(getContext().system(), this);
98 // By default persistent will be true and can be turned off using the system
99 // property shard.persistent
100 private final boolean persistent;
102 /// The name of this shard
103 private final ShardIdentifier name;
105 private final ShardStats shardMBean;
107 private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
109 private final DatastoreContext datastoreContext;
112 private SchemaContext schemaContext;
114 private ActorRef createSnapshotTransaction;
116 private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
117 DatastoreContext datastoreContext, SchemaContext schemaContext) {
118 super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
121 this.datastoreContext = datastoreContext;
122 this.schemaContext = schemaContext;
124 String setting = System.getProperty("shard.persistent");
126 this.persistent = !"false".equals(setting);
128 LOG.info("Shard created : {} persistent : {}", name, persistent);
130 store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
131 datastoreContext.getDataStoreProperties());
133 if(schemaContext != null) {
134 store.onGlobalContextUpdated(schemaContext);
137 shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
142 private static Map<String, String> mapPeerAddresses(
143 Map<ShardIdentifier, String> peerAddresses) {
144 Map<String, String> map = new HashMap<>();
146 for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
148 map.put(entry.getKey().toString(), entry.getValue());
154 public static Props props(final ShardIdentifier name,
155 final Map<ShardIdentifier, String> peerAddresses,
156 DatastoreContext datastoreContext, SchemaContext schemaContext) {
157 Preconditions.checkNotNull(name, "name should not be null");
158 Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
159 Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
160 Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
162 return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
165 @Override public void onReceiveRecover(Object message) {
166 LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
169 if (message instanceof RecoveryFailure){
170 LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
172 super.onReceiveRecover(message);
176 @Override public void onReceiveCommand(Object message) {
177 LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
180 if (message.getClass()
181 .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
183 createTransactionChain();
184 } else if (getLeader() != null) {
185 getLeader().forward(message, getContext());
187 } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
188 // This must be for install snapshot. Don't want to open this up and trigger
190 self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
192 // Send a PoisonPill instead of sending close transaction because we do not really need
194 getSender().tell(PoisonPill.getInstance(), self());
196 } else if (message instanceof RegisterChangeListener) {
197 registerChangeListener((RegisterChangeListener) message);
198 } else if (message instanceof UpdateSchemaContext) {
199 updateSchemaContext((UpdateSchemaContext) message);
200 } else if (message instanceof ForwardedCommitTransaction) {
201 handleForwardedCommit((ForwardedCommitTransaction) message);
202 } else if (message.getClass()
203 .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
205 createTransaction(CreateTransaction.fromSerializable(message));
206 } else if (getLeader() != null) {
207 getLeader().forward(message, getContext());
209 } else if (message instanceof PeerAddressResolved) {
210 PeerAddressResolved resolved = (PeerAddressResolved) message;
211 setPeerAddress(resolved.getPeerId().toString(),
212 resolved.getPeerAddress());
214 super.onReceiveCommand(message);
218 private ActorRef createTypedTransactionActor(
220 ShardTransactionIdentifier transactionId) {
222 if(this.schemaContext == null){
223 throw new NullPointerException("schemaContext should not be null");
227 == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
229 shardMBean.incrementReadOnlyTransactionCount();
231 return getContext().actorOf(
232 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
233 schemaContext,datastoreContext, name.toString()), transactionId.toString());
235 } else if (transactionType
236 == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
238 shardMBean.incrementReadWriteTransactionCount();
240 return getContext().actorOf(
241 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
242 schemaContext, datastoreContext,name.toString()), transactionId.toString());
245 } else if (transactionType
246 == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
248 shardMBean.incrementWriteOnlyTransactionCount();
250 return getContext().actorOf(
251 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
252 schemaContext, datastoreContext, name.toString()), transactionId.toString());
254 throw new IllegalArgumentException(
255 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
260 private void createTransaction(CreateTransaction createTransaction) {
261 createTransaction(createTransaction.getTransactionType(),
262 createTransaction.getTransactionId());
265 private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
267 ShardTransactionIdentifier transactionId =
268 ShardTransactionIdentifier.builder()
269 .remoteTransactionId(remoteTransactionId)
271 LOG.debug("Creating transaction : {} ", transactionId);
272 ActorRef transactionActor =
273 createTypedTransactionActor(transactionType, transactionId);
276 .tell(new CreateTransactionReply(
277 Serialization.serializedActorPath(transactionActor),
278 remoteTransactionId).toSerializable(),
281 return transactionActor;
284 private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
285 throws ExecutionException, InterruptedException {
286 DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
287 commitCohort.preCommit().get();
288 commitCohort.commit().get();
292 private void commit(final ActorRef sender, Object serialized) {
293 Modification modification = MutableCompositeModification
294 .fromSerializable(serialized, schemaContext);
295 DOMStoreThreePhaseCommitCohort cohort =
296 modificationToCohort.remove(serialized);
297 if (cohort == null) {
299 "Could not find cohort for modification : {}. Writing modification using a new transaction",
301 DOMStoreWriteTransaction transaction =
302 store.newWriteOnlyTransaction();
303 modification.apply(transaction);
305 syncCommitTransaction(transaction);
306 } catch (InterruptedException | ExecutionException e) {
307 shardMBean.incrementFailedTransactionsCount();
308 LOG.error("Failed to commit", e);
311 //we want to just apply the recovery commit and return
312 shardMBean.incrementCommittedTransactionCount();
316 final ListenableFuture<Void> future = cohort.commit();
317 final ActorRef self = getSelf();
319 Futures.addCallback(future, new FutureCallback<Void>() {
321 public void onSuccess(Void v) {
322 sender.tell(new CommitTransactionReply().toSerializable(), self);
323 shardMBean.incrementCommittedTransactionCount();
324 shardMBean.setLastCommittedTransactionTime(new Date());
328 public void onFailure(Throwable t) {
329 LOG.error(t, "An exception happened during commit");
330 shardMBean.incrementFailedTransactionsCount();
331 sender.tell(new akka.actor.Status.Failure(t), self);
337 private void handleForwardedCommit(ForwardedCommitTransaction message) {
338 Object serializedModification =
339 message.getModification().toSerializable();
342 .put(serializedModification, message.getCohort());
345 this.persistData(getSender(), "identifier",
346 new CompositeModificationPayload(serializedModification));
348 this.commit(getSender(), serializedModification);
352 private void updateSchemaContext(UpdateSchemaContext message) {
353 this.schemaContext = message.getSchemaContext();
354 updateSchemaContext(message.getSchemaContext());
355 store.onGlobalContextUpdated(message.getSchemaContext());
358 @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
359 store.onGlobalContextUpdated(schemaContext);
362 private void registerChangeListener(
363 RegisterChangeListener registerChangeListener) {
365 LOG.debug("registerDataChangeListener for {}", registerChangeListener
369 ActorSelection dataChangeListenerPath = getContext()
370 .system().actorSelection(
371 registerChangeListener.getDataChangeListenerPath());
374 // Notify the listener if notifications should be enabled or not
375 // If this shard is the leader then it will enable notifications else
377 dataChangeListenerPath
378 .tell(new EnableNotification(isLeader()), getSelf());
380 // Now store a reference to the data change listener so it can be notified
381 // at a later point if notifications should be enabled or disabled
382 dataChangeListeners.add(dataChangeListenerPath);
384 AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
385 listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
387 ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
388 registration = store.registerChangeListener(registerChangeListener.getPath(),
389 listener, registerChangeListener.getScope());
390 ActorRef listenerRegistration =
391 getContext().actorOf(
392 DataChangeListenerRegistration.props(registration));
395 "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
396 , listenerRegistration.path().toString());
399 .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
403 private void createTransactionChain() {
404 DOMStoreTransactionChain chain = store.createTransactionChain();
405 ActorRef transactionChain = getContext().actorOf(
406 ShardTransactionChain.props(chain, schemaContext, datastoreContext,name.toString() ));
407 getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
411 @Override protected void applyState(ActorRef clientActor, String identifier,
414 if (data instanceof CompositeModificationPayload) {
415 Object modification =
416 ((CompositeModificationPayload) data).getModification();
418 if (modification != null) {
419 commit(clientActor, modification);
422 "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
423 identifier, clientActor.path().toString());
427 LOG.error("Unknown state received {}", data);
431 ReplicatedLogEntry lastLogEntry = getLastLogEntry();
433 if (lastLogEntry != null) {
434 shardMBean.setLastLogIndex(lastLogEntry.getIndex());
435 shardMBean.setLastLogTerm(lastLogEntry.getTerm());
438 shardMBean.setCommitIndex(getCommitIndex());
439 shardMBean.setLastApplied(getLastApplied());
443 @Override protected void createSnapshot() {
444 if (createSnapshotTransaction == null) {
446 // Create a transaction. We are really going to treat the transaction as a worker
447 // so that this actor does not get block building the snapshot
448 createSnapshotTransaction = createTransaction(
449 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
452 createSnapshotTransaction.tell(
453 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
458 @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
459 // Since this will be done only on Recovery or when this actor is a Follower
460 // we can safely commit everything in here. We not need to worry about event notifications
461 // as they would have already been disabled on the follower
463 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
464 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
465 NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
466 .decode(YangInstanceIdentifier.builder().build(), serializedNode);
468 // delete everything first
469 transaction.delete(YangInstanceIdentifier.builder().build());
471 // Add everything from the remote node back
472 transaction.write(YangInstanceIdentifier.builder().build(), node);
473 syncCommitTransaction(transaction);
474 } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
475 LOG.error(e, "An exception occurred when applying snapshot");
479 @Override protected void onStateChanged() {
480 for (ActorSelection dataChangeListener : dataChangeListeners) {
482 .tell(new EnableNotification(isLeader()), getSelf());
485 if (getLeaderId() != null) {
486 shardMBean.setLeader(getLeaderId());
489 shardMBean.setRaftState(getRaftState().name());
490 shardMBean.setCurrentTerm(getCurrentTerm());
493 @Override public String persistenceId() {
494 return this.name.toString();
498 private static class ShardConfigParams extends DefaultConfigParamsImpl {
499 public static final FiniteDuration HEART_BEAT_INTERVAL =
500 new FiniteDuration(500, TimeUnit.MILLISECONDS);
502 @Override public FiniteDuration getHeartBeatInterval() {
503 return HEART_BEAT_INTERVAL;
507 private static class ShardCreator implements Creator<Shard> {
509 private static final long serialVersionUID = 1L;
511 final ShardIdentifier name;
512 final Map<ShardIdentifier, String> peerAddresses;
513 final DatastoreContext datastoreContext;
514 final SchemaContext schemaContext;
516 ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
517 DatastoreContext datastoreContext, SchemaContext schemaContext) {
519 this.peerAddresses = peerAddresses;
520 this.datastoreContext = datastoreContext;
521 this.schemaContext = schemaContext;
525 public Shard create() throws Exception {
526 return new Shard(name, peerAddresses, datastoreContext, schemaContext);
530 @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
531 DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
533 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
534 transaction.read(YangInstanceIdentifier.builder().build());
536 NormalizedNode<?, ?> node = future.get().get();
543 @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
544 throws ExecutionException, InterruptedException {
545 DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
547 transaction.write(id, node);
549 syncCommitTransaction(transaction);