import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
private final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
- // By default persistent will be true and can be turned off using the system
- // property shard.persistent
- private final boolean persistent;
-
/// The name of this shard
private final ShardIdentifier name;
private final DatastoreContext datastoreContext;
+ private final DataPersistenceProvider dataPersistenceProvider;
+
private SchemaContext schemaContext;
private ActorRef createSnapshotTransaction;
this.name = name;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
+ this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
- String setting = System.getProperty("shard.persistent");
-
- this.persistent = !"false".equals(setting);
-
- LOG.info("Shard created : {} persistent : {}", name, persistent);
+ LOG.info("Shard created : {} persistent : {}", name, datastoreContext.isPersistent());
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
}
@Override
- public void onReceiveRecover(Object message) {
+ public void onReceiveRecover(Object message) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("onReceiveRecover: Received message {} from {}",
message.getClass().toString(),
}
@Override
- public void onReceiveCommand(Object message) {
+ public void onReceiveCommand(Object message) throws Exception {
if(LOG.isDebugEnabled()) {
LOG.debug("onReceiveCommand: Received message {} from {}", message, getSender());
}
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- if(persistent) {
- Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
- } else {
- Shard.this.finishCommit(getSender(), transactionID);
- }
+ Shard.this.persistData(getSender(), transactionID,
+ new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
} catch (InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
} else if (getLeader() != null) {
getLeader().forward(message, getContext());
} else {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(
"Could not find shard leader so transaction cannot be created. This typically happens" +
- " when system is coming up or recovering and a leader is being elected. Try again" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
" later.")), getSelf());
}
}
}
}
+ @Override
+ protected DataPersistenceProvider persistence() {
+ return dataPersistenceProvider;
+ }
+
@Override protected void onLeaderChanged(String oldLeader, String newLeader) {
shardMBean.setLeader(newLeader);
}
return this.name.toString();
}
+ @VisibleForTesting
+ DataPersistenceProvider getDataPersistenceProvider() {
+ return dataPersistenceProvider;
+ }
+
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;