Metrics and Configuration
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import akka.persistence.RecoveryFailure;
19 import akka.serialization.Serialization;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.protobuf.ByteString;
28 import com.google.protobuf.InvalidProtocolBufferException;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
34 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
38 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
39 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
41 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
45 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
46 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
47 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
48 import org.opendaylight.controller.cluster.datastore.modification.Modification;
49 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
51 import org.opendaylight.controller.cluster.raft.ConfigParams;
52 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
53 import org.opendaylight.controller.cluster.raft.RaftActor;
54 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
55 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
56 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
57 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
58 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
59 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
60 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
63 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
64 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
65 import org.opendaylight.yangtools.concepts.ListenerRegistration;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69 import scala.concurrent.duration.FiniteDuration;
70
71 import java.util.ArrayList;
72 import java.util.HashMap;
73 import java.util.List;
74 import java.util.Map;
75 import java.util.concurrent.ExecutionException;
76 import java.util.concurrent.TimeUnit;
77
78 /**
79  * A Shard represents a portion of the logical data tree <br/>
80  * <p>
81  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
82  * </p>
83  */
84 public class Shard extends RaftActor {
85
86     private static final ConfigParams configParams = new ShardConfigParams();
87
88     public static final String DEFAULT_NAME = "default";
89
90     // The state of this Shard
91     private final InMemoryDOMDataStore store;
92
93     private final Map<Object, DOMStoreThreePhaseCommitCohort>
94         modificationToCohort = new HashMap<>();
95
96     private final LoggingAdapter LOG =
97         Logging.getLogger(getContext().system(), this);
98
99     // By default persistent will be true and can be turned off using the system
100     // property shard.persistent
101     private final boolean persistent;
102
103     /// The name of this shard
104     private final ShardIdentifier name;
105
106     private final ShardStats shardMBean;
107
108     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
109
110     private final DatastoreContext datastoreContext;
111
112
113     private SchemaContext schemaContext;
114
115     private ActorRef createSnapshotTransaction;
116
117     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
118             DatastoreContext datastoreContext, SchemaContext schemaContext) {
119         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
120
121         this.name = name;
122         this.datastoreContext = datastoreContext;
123         this.schemaContext = schemaContext;
124
125         String setting = System.getProperty("shard.persistent");
126
127         this.persistent = !"false".equals(setting);
128
129         LOG.info("Shard created : {} persistent : {}", name, persistent);
130
131         store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
132                 datastoreContext.getDataStoreProperties());
133
134         if(schemaContext != null) {
135             store.onGlobalContextUpdated(schemaContext);
136         }
137
138         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
139                 datastoreContext.getDataStoreMXBeanType());
140         shardMBean.setDataStoreExecutor(store.getDomStoreExecutor());
141         shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
142
143         if (isMetricsCaptureEnabled()) {
144             getContext().become(new MeteringBehavior(this));
145         }
146     }
147
148     private static Map<String, String> mapPeerAddresses(
149         Map<ShardIdentifier, String> peerAddresses) {
150         Map<String, String> map = new HashMap<>();
151
152         for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
153             .entrySet()) {
154             map.put(entry.getKey().toString(), entry.getValue());
155         }
156
157         return map;
158     }
159
160     public static Props props(final ShardIdentifier name,
161         final Map<ShardIdentifier, String> peerAddresses,
162         DatastoreContext datastoreContext, SchemaContext schemaContext) {
163         Preconditions.checkNotNull(name, "name should not be null");
164         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
165         Preconditions.checkNotNull(datastoreContext, "dataStoreContext should not be null");
166         Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
167
168         return Props.create(new ShardCreator(name, peerAddresses, datastoreContext, schemaContext));
169     }
170
171     @Override public void onReceiveRecover(Object message) {
172         LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
173             getSender());
174
175         if (message instanceof RecoveryFailure){
176             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
177         } else {
178             super.onReceiveRecover(message);
179         }
180     }
181
182     @Override public void onReceiveCommand(Object message) {
183         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
184             getSender());
185
186         if (message.getClass()
187             .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
188             if (isLeader()) {
189                 createTransactionChain();
190             } else if (getLeader() != null) {
191                 getLeader().forward(message, getContext());
192             }
193         } else if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
194             // This must be for install snapshot. Don't want to open this up and trigger
195             // deSerialization
196             self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self());
197
198             // Send a PoisonPill instead of sending close transaction because we do not really need
199             // a response
200             getSender().tell(PoisonPill.getInstance(), self());
201
202         } else if (message instanceof RegisterChangeListener) {
203             registerChangeListener((RegisterChangeListener) message);
204         } else if (message instanceof UpdateSchemaContext) {
205             updateSchemaContext((UpdateSchemaContext) message);
206         } else if (message instanceof ForwardedCommitTransaction) {
207             handleForwardedCommit((ForwardedCommitTransaction) message);
208         } else if (message.getClass()
209             .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
210             if (isLeader()) {
211                 createTransaction(CreateTransaction.fromSerializable(message));
212             } else if (getLeader() != null) {
213                 getLeader().forward(message, getContext());
214             }
215         } else if (message instanceof PeerAddressResolved) {
216             PeerAddressResolved resolved = (PeerAddressResolved) message;
217             setPeerAddress(resolved.getPeerId().toString(),
218                 resolved.getPeerAddress());
219         } else {
220             super.onReceiveCommand(message);
221         }
222     }
223
224     private ActorRef createTypedTransactionActor(
225         int transactionType,
226         ShardTransactionIdentifier transactionId) {
227
228         if(this.schemaContext == null){
229             throw new NullPointerException("schemaContext should not be null");
230         }
231
232         if (transactionType
233             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
234
235             shardMBean.incrementReadOnlyTransactionCount();
236
237             return getContext().actorOf(
238                 ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
239                         schemaContext,datastoreContext, shardMBean), transactionId.toString());
240
241         } else if (transactionType
242             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
243
244             shardMBean.incrementReadWriteTransactionCount();
245
246             return getContext().actorOf(
247                 ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
248                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
249
250
251         } else if (transactionType
252             == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
253
254             shardMBean.incrementWriteOnlyTransactionCount();
255
256             return getContext().actorOf(
257                 ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
258                         schemaContext, datastoreContext, shardMBean), transactionId.toString());
259         } else {
260             throw new IllegalArgumentException(
261                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
262                     + transactionType);
263         }
264     }
265
266     private void createTransaction(CreateTransaction createTransaction) {
267         createTransaction(createTransaction.getTransactionType(),
268             createTransaction.getTransactionId());
269     }
270
271     private ActorRef createTransaction(int transactionType, String remoteTransactionId) {
272
273         ShardTransactionIdentifier transactionId =
274             ShardTransactionIdentifier.builder()
275                 .remoteTransactionId(remoteTransactionId)
276                 .build();
277         LOG.debug("Creating transaction : {} ", transactionId);
278         ActorRef transactionActor =
279             createTypedTransactionActor(transactionType, transactionId);
280
281         getSender()
282             .tell(new CreateTransactionReply(
283                     Serialization.serializedActorPath(transactionActor),
284                     remoteTransactionId).toSerializable(),
285                 getSelf());
286
287         return transactionActor;
288     }
289
290     private void syncCommitTransaction(DOMStoreWriteTransaction transaction)
291         throws ExecutionException, InterruptedException {
292         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
293         commitCohort.preCommit().get();
294         commitCohort.commit().get();
295     }
296
297
298     private void commit(final ActorRef sender, Object serialized) {
299         Modification modification = MutableCompositeModification
300             .fromSerializable(serialized, schemaContext);
301         DOMStoreThreePhaseCommitCohort cohort =
302             modificationToCohort.remove(serialized);
303         if (cohort == null) {
304             LOG.debug(
305                 "Could not find cohort for modification : {}. Writing modification using a new transaction",
306                 modification);
307             DOMStoreWriteTransaction transaction =
308                 store.newWriteOnlyTransaction();
309             modification.apply(transaction);
310             try {
311                 syncCommitTransaction(transaction);
312             } catch (InterruptedException | ExecutionException e) {
313                 shardMBean.incrementFailedTransactionsCount();
314                 LOG.error("Failed to commit", e);
315                 return;
316             }
317             //we want to just apply the recovery commit and return
318             shardMBean.incrementCommittedTransactionCount();
319             return;
320         }
321
322         final ListenableFuture<Void> future = cohort.commit();
323         final ActorRef self = getSelf();
324
325         Futures.addCallback(future, new FutureCallback<Void>() {
326             @Override
327             public void onSuccess(Void v) {
328                 sender.tell(new CommitTransactionReply().toSerializable(), self);
329                 shardMBean.incrementCommittedTransactionCount();
330                 shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
331             }
332
333             @Override
334             public void onFailure(Throwable t) {
335                 LOG.error(t, "An exception happened during commit");
336                 shardMBean.incrementFailedTransactionsCount();
337                 sender.tell(new akka.actor.Status.Failure(t), self);
338             }
339         });
340
341     }
342
343     private void handleForwardedCommit(ForwardedCommitTransaction message) {
344         Object serializedModification =
345             message.getModification().toSerializable();
346
347         modificationToCohort
348             .put(serializedModification, message.getCohort());
349
350         if (persistent) {
351             this.persistData(getSender(), "identifier",
352                 new CompositeModificationPayload(serializedModification));
353         } else {
354             this.commit(getSender(), serializedModification);
355         }
356     }
357
358     private void updateSchemaContext(UpdateSchemaContext message) {
359         this.schemaContext = message.getSchemaContext();
360         updateSchemaContext(message.getSchemaContext());
361         store.onGlobalContextUpdated(message.getSchemaContext());
362     }
363
364     @VisibleForTesting void updateSchemaContext(SchemaContext schemaContext) {
365         store.onGlobalContextUpdated(schemaContext);
366     }
367
368     private void registerChangeListener(
369         RegisterChangeListener registerChangeListener) {
370
371         LOG.debug("registerDataChangeListener for {}", registerChangeListener
372             .getPath());
373
374
375         ActorSelection dataChangeListenerPath = getContext()
376             .system().actorSelection(
377                 registerChangeListener.getDataChangeListenerPath());
378
379
380         // Notify the listener if notifications should be enabled or not
381         // If this shard is the leader then it will enable notifications else
382         // it will not
383         dataChangeListenerPath
384             .tell(new EnableNotification(isLeader()), getSelf());
385
386         // Now store a reference to the data change listener so it can be notified
387         // at a later point if notifications should be enabled or disabled
388         dataChangeListeners.add(dataChangeListenerPath);
389
390         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
391             listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
392
393         ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
394             registration = store.registerChangeListener(registerChangeListener.getPath(),
395                 listener, registerChangeListener.getScope());
396         ActorRef listenerRegistration =
397             getContext().actorOf(
398                 DataChangeListenerRegistration.props(registration));
399
400         LOG.debug(
401             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
402             , listenerRegistration.path().toString());
403
404         getSender()
405             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
406                 getSelf());
407     }
408
409     private void createTransactionChain() {
410         DOMStoreTransactionChain chain = store.createTransactionChain();
411         ActorRef transactionChain = getContext().actorOf(
412                 ShardTransactionChain.props(chain, schemaContext, datastoreContext, shardMBean));
413         getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
414                 getSelf());
415     }
416
417     private boolean isMetricsCaptureEnabled(){
418         CommonConfig config = new CommonConfig(getContext().system().settings().config());
419         return config.isMetricCaptureEnabled();
420     }
421
422     @Override protected void applyState(ActorRef clientActor, String identifier,
423         Object data) {
424
425         if (data instanceof CompositeModificationPayload) {
426             Object modification =
427                 ((CompositeModificationPayload) data).getModification();
428
429             if (modification != null) {
430                 commit(clientActor, modification);
431             } else {
432                 LOG.error(
433                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
434                     identifier, clientActor.path().toString());
435             }
436
437         } else {
438             LOG.error("Unknown state received {}", data);
439         }
440
441         // Update stats
442         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
443
444         if (lastLogEntry != null) {
445             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
446             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
447         }
448
449         shardMBean.setCommitIndex(getCommitIndex());
450         shardMBean.setLastApplied(getLastApplied());
451
452     }
453
454     @Override protected void createSnapshot() {
455         if (createSnapshotTransaction == null) {
456
457             // Create a transaction. We are really going to treat the transaction as a worker
458             // so that this actor does not get block building the snapshot
459             createSnapshotTransaction = createTransaction(
460                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
461                 "createSnapshot");
462
463             createSnapshotTransaction.tell(
464                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
465
466         }
467     }
468
469     @VisibleForTesting @Override protected void applySnapshot(ByteString snapshot) {
470         // Since this will be done only on Recovery or when this actor is a Follower
471         // we can safely commit everything in here. We not need to worry about event notifications
472         // as they would have already been disabled on the follower
473         try {
474             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
475             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
476             NormalizedNode<?, ?> node = new NormalizedNodeToNodeCodec(schemaContext)
477                 .decode(YangInstanceIdentifier.builder().build(), serializedNode);
478
479             // delete everything first
480             transaction.delete(YangInstanceIdentifier.builder().build());
481
482             // Add everything from the remote node back
483             transaction.write(YangInstanceIdentifier.builder().build(), node);
484             syncCommitTransaction(transaction);
485         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
486             LOG.error(e, "An exception occurred when applying snapshot");
487         }
488     }
489
490     @Override protected void onStateChanged() {
491         for (ActorSelection dataChangeListener : dataChangeListeners) {
492             dataChangeListener
493                 .tell(new EnableNotification(isLeader()), getSelf());
494         }
495
496         if (getLeaderId() != null) {
497             shardMBean.setLeader(getLeaderId());
498         }
499
500         shardMBean.setRaftState(getRaftState().name());
501         shardMBean.setCurrentTerm(getCurrentTerm());
502     }
503
504     @Override public String persistenceId() {
505         return this.name.toString();
506     }
507
508
509     private static class ShardConfigParams extends DefaultConfigParamsImpl {
510         public static final FiniteDuration HEART_BEAT_INTERVAL =
511             new FiniteDuration(500, TimeUnit.MILLISECONDS);
512
513         @Override public FiniteDuration getHeartBeatInterval() {
514             return HEART_BEAT_INTERVAL;
515         }
516     }
517
518     private static class ShardCreator implements Creator<Shard> {
519
520         private static final long serialVersionUID = 1L;
521
522         final ShardIdentifier name;
523         final Map<ShardIdentifier, String> peerAddresses;
524         final DatastoreContext datastoreContext;
525         final SchemaContext schemaContext;
526
527         ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
528                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
529             this.name = name;
530             this.peerAddresses = peerAddresses;
531             this.datastoreContext = datastoreContext;
532             this.schemaContext = schemaContext;
533         }
534
535         @Override
536         public Shard create() throws Exception {
537             return new Shard(name, peerAddresses, datastoreContext, schemaContext);
538         }
539     }
540
541     @VisibleForTesting NormalizedNode readStore() throws ExecutionException, InterruptedException {
542         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
543
544         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
545             transaction.read(YangInstanceIdentifier.builder().build());
546
547         NormalizedNode<?, ?> node = future.get().get();
548
549         transaction.close();
550
551         return node;
552     }
553
554     @VisibleForTesting void writeToStore(YangInstanceIdentifier id, NormalizedNode node)
555         throws ExecutionException, InterruptedException {
556         DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
557
558         transaction.write(id, node);
559
560         syncCommitTransaction(transaction);
561     }
562
563 }