d6ad553cf3a237002645811dad1b299c3f2fd309
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.Persistent;
18 import akka.persistence.RecoveryCompleted;
19 import akka.persistence.UntypedProcessor;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.NonPersistent;
32 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
33 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
34 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
35 import org.opendaylight.controller.cluster.datastore.modification.Modification;
36 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.Executors;
50
51 /**
52  * A Shard represents a portion of the logical data tree <br/>
53  * <p>
54  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
55  * </p>
56  */
57 public class Shard extends UntypedProcessor {
58
59     public static final String DEFAULT_NAME = "default";
60
61     private final ListeningExecutorService storeExecutor =
62         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
63
64     private final InMemoryDOMDataStore store;
65
66     private final Map<Object, DOMStoreThreePhaseCommitCohort>
67         modificationToCohort = new HashMap<>();
68
69     private final LoggingAdapter LOG =
70         Logging.getLogger(getContext().system(), this);
71
72     // By default persistent will be true and can be turned off using the system
73     // property persistent
74     private final boolean persistent;
75
76     private SchemaContext schemaContext;
77
78     private final ShardStats shardMBean;
79
80     private Shard(String name) {
81
82         String setting = System.getProperty("shard.persistent");
83
84         this.persistent = !"false".equals(setting);
85
86         LOG.info("Creating shard : {} persistent : {}", name, persistent);
87
88         store = new InMemoryDOMDataStore(name, storeExecutor);
89
90         shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
91
92     }
93
94     public static Props props(final String name) {
95         return Props.create(new Creator<Shard>() {
96
97             @Override
98             public Shard create() throws Exception {
99                 return new Shard(name);
100             }
101
102         });
103     }
104
105
106     @Override
107     public void onReceive(Object message) throws Exception {
108         LOG.debug("Received message " + message.getClass().toString());
109
110         if(!recoveryFinished()){
111             // FIXME : Properly handle recovery
112             return;
113         }
114
115         if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
116             createTransactionChain();
117         } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
118             registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
119         } else if (message instanceof UpdateSchemaContext) {
120             updateSchemaContext((UpdateSchemaContext) message);
121         } else if (message instanceof ForwardedCommitTransaction) {
122             handleForwardedCommit((ForwardedCommitTransaction) message);
123         } else if (message instanceof Persistent) {
124             commit(((Persistent)message).payload());
125         } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
126             createTransaction(CreateTransaction.fromSerializable(message));
127         } else if(message instanceof NonPersistent){
128             commit(((NonPersistent)message).payload());
129         }else if (message instanceof RecoveryCompleted) {
130             //FIXME: PROPERLY HANDLE RECOVERY COMPLETED
131
132         }else {
133           throw new Exception("Not recognized message found message=" + message);
134         }
135     }
136
137     private void createTransaction(CreateTransaction createTransaction) {
138         DOMStoreReadWriteTransaction transaction =
139             store.newReadWriteTransaction();
140         ActorRef transactionActor = getContext().actorOf(
141             ShardTransaction.props(transaction, getSelf(), schemaContext), "shard-" + createTransaction.getTransactionId());
142         getSender()
143             .tell(new CreateTransactionReply(transactionActor.path().toString(), createTransaction.getTransactionId()).toSerializable(),
144                 getSelf());
145     }
146
147     private void commit(Object serialized) {
148         Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
149         DOMStoreThreePhaseCommitCohort cohort =
150             modificationToCohort.remove(serialized);
151         if (cohort == null) {
152             LOG.error(
153                 "Could not find cohort for modification : " + modification);
154             return;
155         }
156         final ListenableFuture<Void> future = cohort.commit();
157         shardMBean.incrementCommittedTransactionCount();
158         final ActorRef sender = getSender();
159         final ActorRef self = getSelf();
160         future.addListener(new Runnable() {
161             @Override
162             public void run() {
163                 try {
164                     future.get();
165                     sender.tell(new CommitTransactionReply().toSerializable(), self);
166                 } catch (InterruptedException | ExecutionException e) {
167                     // FIXME : Handle this properly
168                     LOG.error(e, "An exception happened when committing");
169                 }
170             }
171         }, getContext().dispatcher());
172     }
173
174     private void handleForwardedCommit(ForwardedCommitTransaction message) {
175         Object serializedModification = message.getModification().toSerializable();
176
177         modificationToCohort
178             .put(serializedModification , message.getCohort());
179         if(persistent) {
180             getSelf().forward(Persistent.create(serializedModification),
181                 getContext());
182         } else {
183             getSelf().forward(NonPersistent.create(serializedModification),
184                 getContext());
185         }
186     }
187
188     private void updateSchemaContext(UpdateSchemaContext message) {
189         this.schemaContext = message.getSchemaContext();
190         store.onGlobalContextUpdated(message.getSchemaContext());
191     }
192
193     private void registerChangeListener(
194         RegisterChangeListener registerChangeListener) {
195
196         LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
197
198
199         ActorSelection dataChangeListenerPath = getContext()
200             .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
201
202         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
203             listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
204
205         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
206             registration =
207             store.registerChangeListener(registerChangeListener.getPath(),
208                 listener, registerChangeListener.getScope());
209         ActorRef listenerRegistration =
210             getContext().actorOf(
211                 DataChangeListenerRegistration.props(registration));
212
213         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
214
215         getSender()
216             .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
217                 getSelf());
218     }
219
220     private void createTransactionChain() {
221         DOMStoreTransactionChain chain = store.createTransactionChain();
222         ActorRef transactionChain =
223             getContext().actorOf(ShardTransactionChain.props(chain, schemaContext));
224         getSender()
225             .tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
226                 getSelf());
227     }
228 }