Switch to using protocol buffer serialization for the WriteData message
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import akka.persistence.Persistent;
18 import akka.persistence.UntypedProcessor;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.ListeningExecutorService;
21 import com.google.common.util.concurrent.MoreExecutors;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.NonPersistent;
29 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
30 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
31 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
34 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41
42 import java.util.HashMap;
43 import java.util.Map;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Executors;
46
47 /**
48  * A Shard represents a portion of the logical data tree <br/>
49  * <p>
50  * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
51  * </p>
52  */
53 public class Shard extends UntypedProcessor {
54
55     public static final String DEFAULT_NAME = "default";
56
57     private final ListeningExecutorService storeExecutor =
58         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
59
60     private final InMemoryDOMDataStore store;
61
62     private final Map<Modification, DOMStoreThreePhaseCommitCohort>
63         modificationToCohort = new HashMap<>();
64
65     private final LoggingAdapter log =
66         Logging.getLogger(getContext().system(), this);
67
68     // By default persistent will be true and can be turned off using the system
69     // property persistent
70     private final boolean persistent;
71
72     private SchemaContext schemaContext;
73
74     private Shard(String name) {
75
76         String setting = System.getProperty("shard.persistent");
77         this.persistent = !"false".equals(setting);
78
79         log.info("Creating shard : {} persistent : {}", name , persistent);
80
81         store = new InMemoryDOMDataStore(name, storeExecutor);
82     }
83
84     public static Props props(final String name) {
85         return Props.create(new Creator<Shard>() {
86
87             @Override
88             public Shard create() throws Exception {
89                 return new Shard(name);
90             }
91
92         });
93     }
94
95
96     @Override
97     public void onReceive(Object message) throws Exception {
98         log.debug("Received message {}", message);
99
100         if (message instanceof CreateTransactionChain) {
101             createTransactionChain();
102         } else if (message instanceof RegisterChangeListener) {
103             registerChangeListener((RegisterChangeListener) message);
104         } else if (message instanceof UpdateSchemaContext) {
105             updateSchemaContext((UpdateSchemaContext) message);
106         } else if (message instanceof ForwardedCommitTransaction) {
107             handleForwardedCommit((ForwardedCommitTransaction) message);
108         } else if (message instanceof Persistent) {
109             commit((Modification) ((Persistent) message).payload());
110         } else if (message instanceof CreateTransaction) {
111             createTransaction((CreateTransaction) message);
112         } else if(message instanceof NonPersistent){
113             commit((Modification) ((NonPersistent) message).payload());
114         }
115     }
116
117     private void createTransaction(CreateTransaction createTransaction) {
118         DOMStoreReadWriteTransaction transaction =
119             store.newReadWriteTransaction();
120         ActorRef transactionActor = getContext().actorOf(
121             ShardTransaction.props(transaction, getSelf(), schemaContext), "shard-" + createTransaction.getTransactionId());
122         getSender()
123             .tell(new CreateTransactionReply(transactionActor.path().toString(), createTransaction.getTransactionId()).toSerializable(),
124                 getSelf());
125     }
126
127     private void commit(Modification modification) {
128         DOMStoreThreePhaseCommitCohort cohort =
129             modificationToCohort.remove(modification);
130         if (cohort == null) {
131             log.error(
132                 "Could not find cohort for modification : " + modification);
133             return;
134         }
135         final ListenableFuture<Void> future = cohort.commit();
136         final ActorRef sender = getSender();
137         final ActorRef self = getSelf();
138         future.addListener(new Runnable() {
139             @Override
140             public void run() {
141                 try {
142                     future.get();
143                     sender.tell(new CommitTransactionReply(), self);
144                 } catch (InterruptedException | ExecutionException e) {
145                     // FIXME : Handle this properly
146                     log.error(e, "An exception happened when committing");
147                 }
148             }
149         }, getContext().dispatcher());
150     }
151
152     private void handleForwardedCommit(ForwardedCommitTransaction message) {
153         modificationToCohort
154             .put(message.getModification(), message.getCohort());
155         if(persistent) {
156             getSelf().forward(Persistent.create(message.getModification()),
157                 getContext());
158         } else {
159             getSelf().forward(NonPersistent.create(message.getModification()),
160                 getContext());
161         }
162     }
163
164     private void updateSchemaContext(UpdateSchemaContext message) {
165         this.schemaContext = message.getSchemaContext();
166         store.onGlobalContextUpdated(message.getSchemaContext());
167     }
168
169     private void registerChangeListener(
170         RegisterChangeListener registerChangeListener) {
171
172         ActorSelection dataChangeListenerPath = getContext()
173             .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
174
175         AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
176             listener = new DataChangeListenerProxy(dataChangeListenerPath);
177
178         org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
179             registration =
180             store.registerChangeListener(registerChangeListener.getPath(),
181                 listener, registerChangeListener.getScope());
182         ActorRef listenerRegistration =
183             getContext().actorOf(
184                 DataChangeListenerRegistration.props(registration));
185         getSender()
186             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
187                 getSelf());
188     }
189
190     private void createTransactionChain() {
191         DOMStoreTransactionChain chain = store.createTransactionChain();
192         ActorRef transactionChain =
193             getContext().actorOf(ShardTransactionChain.props(chain, schemaContext));
194         getSender()
195             .tell(new CreateTransactionChainReply(transactionChain.path()),
196                 getSelf());
197     }
198 }