+ public static final String DEFAULT_NAME = "default";
+
+ private final ListeningExecutorService storeExecutor =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+
+ private final InMemoryDOMDataStore store;
+
+ private final Map<Modification, DOMStoreThreePhaseCommitCohort>
+ modificationToCohort = new HashMap<>();
+
+ private final LoggingAdapter log =
+ Logging.getLogger(getContext().system(), this);
+
+ // By default persistent will be true and can be turned off using the system
+ // property persistent
+ private final boolean persistent;
+
+ private Shard(String name) {
+
+ String setting = System.getProperty("shard.persistent");
+ this.persistent = !"false".equals(setting);
+
+ log.info("Creating shard : {} persistent : {}", name , persistent);
+
+ store = new InMemoryDOMDataStore(name, storeExecutor);
+ }
+
+ public static Props props(final String name) {
+ return Props.create(new Creator<Shard>() {
+
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(name);
+ }
+
+ });
+ }
+
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ log.debug("Received message {}", message);
+
+ if (message instanceof CreateTransactionChain) {
+ createTransactionChain();
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof ForwardedCommitTransaction) {
+ handleForwardedCommit((ForwardedCommitTransaction) message);
+ } else if (message instanceof Persistent) {
+ commit((Modification) ((Persistent) message).payload());
+ } else if (message instanceof CreateTransaction) {
+ createTransaction((CreateTransaction) message);
+ } else if(message instanceof NonPersistent){
+ commit((Modification) ((NonPersistent) message).payload());
+ }
+ }
+
+ private void createTransaction(CreateTransaction createTransaction) {
+ DOMStoreReadWriteTransaction transaction =
+ store.newReadWriteTransaction();
+ ActorRef transactionActor = getContext().actorOf(
+ ShardTransaction.props(transaction, getSelf()), "shard-" + createTransaction.getTransactionId());
+ getSender()
+ .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
+ getSelf());
+ }