9e40a6990779a137270668dffdf22edb0ea748ec
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.Persistent;
18 import akka.persistence.UntypedProcessor;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.NonPersistent;
29 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
31 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
34 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
35 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42
43 import java.util.HashMap;
44 import java.util.Map;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.Executors;
47
48 /**
49  * A Shard represents a portion of the logical data tree <br/>
50  * <p>
51  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
52  * </p>
53  */
54 public class Shard extends UntypedProcessor {
55
56     public static final String DEFAULT_NAME = "default";
57
58     private final ListeningExecutorService storeExecutor =
59         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
60
61     private final InMemoryDOMDataStore store;
62
63     private final Map<Object, DOMStoreThreePhaseCommitCohort>
64         modificationToCohort = new HashMap<>();
65
66     private final LoggingAdapter log =
67         Logging.getLogger(getContext().system(), this);
68
69     // By default persistent will be true and can be turned off using the system
70     // property persistent
71     private final boolean persistent;
72
73     private SchemaContext schemaContext;
74
75     private Shard(String name) {
76
77         String setting = System.getProperty("shard.persistent");
78         this.persistent = !"false".equals(setting);
79
80         log.info("Creating shard : {} persistent : {}", name , persistent);
81
82         store = new InMemoryDOMDataStore(name, storeExecutor);
83     }
84
85     public static Props props(final String name) {
86         return Props.create(new Creator<Shard>() {
87
88             @Override
89             public Shard create() throws Exception {
90                 return new Shard(name);
91             }
92
93         });
94     }
95
96
97     @Override
98     public void onReceive(Object message) throws Exception {
99         log.debug("Received message {}", message);
100
101         if(!recoveryFinished()){
102             // FIXME : Properly handle recovery
103             return;
104         }
105
106         if (message instanceof CreateTransactionChain) {
107             createTransactionChain();
108         } else if (message.getClass().equals(RegisterChangeListener.SERIALIZABLE_CLASS)) {
109             registerChangeListener(RegisterChangeListener.fromSerializable(getContext().system(), message));
110         } else if (message instanceof UpdateSchemaContext) {
111             updateSchemaContext((UpdateSchemaContext) message);
112         } else if (message instanceof ForwardedCommitTransaction) {
113             handleForwardedCommit((ForwardedCommitTransaction) message);
114         } else if (message instanceof Persistent) {
115             commit(((Persistent)message).payload());
116         } else if (message instanceof CreateTransaction) {
117             createTransaction((CreateTransaction) message);
118         } else if(message instanceof NonPersistent){
119             commit(((NonPersistent)message).payload());
120         } else {
121           throw new Exception("Not recognized message in Shard::OnReceive"+message);
122         }
123     }
124
125     private void createTransaction(CreateTransaction createTransaction) {
126         DOMStoreReadWriteTransaction transaction =
127             store.newReadWriteTransaction();
128         ActorRef transactionActor = getContext().actorOf(
129             ShardTransaction.props(transaction, getSelf(), schemaContext), "shard-" + createTransaction.getTransactionId());
130         getSender()
131             .tell(new CreateTransactionReply(transactionActor.path().toString(), createTransaction.getTransactionId()).toSerializable(),
132                 getSelf());
133     }
134
135     private void commit(Object serialized) {
136         Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
137         DOMStoreThreePhaseCommitCohort cohort =
138             modificationToCohort.remove(serialized);
139         if (cohort == null) {
140             log.error(
141                 "Could not find cohort for modification : " + modification);
142             return;
143         }
144         final ListenableFuture<Void> future = cohort.commit();
145         final ActorRef sender = getSender();
146         final ActorRef self = getSelf();
147         future.addListener(new Runnable() {
148             @Override
149             public void run() {
150                 try {
151                     future.get();
152                     sender.tell(new CommitTransactionReply(), self);
153                 } catch (InterruptedException | ExecutionException e) {
154                     // FIXME : Handle this properly
155                     log.error(e, "An exception happened when committing");
156                 }
157             }
158         }, getContext().dispatcher());
159     }
160
161     private void handleForwardedCommit(ForwardedCommitTransaction message) {
162         Object serializedModification = message.getModification().toSerializable();
163
164         modificationToCohort
165             .put(serializedModification , message.getCohort());
166         if(persistent) {
167             getSelf().forward(Persistent.create(serializedModification),
168                 getContext());
169         } else {
170             getSelf().forward(NonPersistent.create(serializedModification),
171                 getContext());
172         }
173     }
174
175     private void updateSchemaContext(UpdateSchemaContext message) {
176         this.schemaContext = message.getSchemaContext();
177         store.onGlobalContextUpdated(message.getSchemaContext());
178     }
179
180     private void registerChangeListener(
181         RegisterChangeListener registerChangeListener) {
182
183         ActorSelection dataChangeListenerPath = getContext()
184             .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
185
186         AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
187             listener = new DataChangeListenerProxy(dataChangeListenerPath);
188
189         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
190             registration =
191             store.registerChangeListener(registerChangeListener.getPath(),
192                 listener, registerChangeListener.getScope());
193         ActorRef listenerRegistration =
194             getContext().actorOf(
195                 DataChangeListenerRegistration.props(registration));
196         getSender()
197             .tell(new RegisterChangeListenerReply(listenerRegistration.path()).toSerializable(),
198                 getSelf());
199     }
200
201     private void createTransactionChain() {
202         DOMStoreTransactionChain chain = store.createTransactionChain();
203         ActorRef transactionChain =
204             getContext().actorOf(ShardTransactionChain.props(chain, schemaContext));
205         getSender()
206             .tell(new CreateTransactionChainReply(transactionChain.path()),
207                 getSelf());
208     }
209 }