Add optional lz4 compression for snapshots
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index f634aecc6285fbbe925f89044df2c326ced9ea10..eeacdd9b6ffa9cb247c698d64c68936f8e594caa 100644 (file)
@@ -25,6 +25,7 @@ import com.google.common.base.Ticker;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Range;
 import java.io.IOException;
 import java.util.Arrays;
@@ -76,6 +77,8 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
@@ -108,8 +111,8 @@ import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -220,10 +223,12 @@ public class Shard extends RaftActor {
                 new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
         if (builder.getDataTree() != null) {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
-                    treeChangeListenerPublisher, name, frontendMetadata);
+                    treeChangeListenerPublisher, name,
+                    frontendMetadata);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
-                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name, frontendMetadata);
+                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher, name,
+                    frontendMetadata);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@@ -248,7 +253,7 @@ public class Shard extends RaftActor {
                 self(), getContext(), shardMBean, builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
-            this.name);
+            this.name, datastoreContext);
 
         messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
 
@@ -279,7 +284,7 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    public void postStop() {
+    public void postStop() throws Exception {
         LOG.info("Stopping Shard {}", persistenceId());
 
         super.postStop();
@@ -371,6 +376,8 @@ public class Shard extends RaftActor {
                 onMakeLeaderLocal();
             } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
                 store.resumeNextPendingTransaction();
+            } else if (GetKnownClients.INSTANCE.equals(message)) {
+                handleGetKnownClients();
             } else if (!responseMessageSlicer.handleMessage(message)) {
                 super.handleNonRaftCommand(message);
             }
@@ -597,6 +604,18 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void handleGetKnownClients() {
+        final ImmutableSet<ClientIdentifier> clients;
+        if (isLeader()) {
+            clients = knownFrontends.values().stream()
+                    .map(LeaderFrontendState::getIdentifier)
+                    .collect(ImmutableSet.toImmutableSet());
+        } else {
+            clients = frontendMetadata.getClients();
+        }
+        sender().tell(new GetKnownClientsReply(clients), self());
+    }
+
     private boolean hasLeader() {
         return getLeaderId() != null;
     }
@@ -881,11 +900,11 @@ public class Shard extends RaftActor {
     }
 
     private void updateSchemaContext(final UpdateSchemaContext message) {
-        updateSchemaContext(message.getSchemaContext());
+        updateSchemaContext(message.getEffectiveModelContext());
     }
 
     @VisibleForTesting
-    void updateSchemaContext(final SchemaContext schemaContext) {
+    void updateSchemaContext(final @NonNull EffectiveModelContext schemaContext) {
         store.updateSchemaContext(schemaContext);
     }
 
@@ -1091,7 +1110,7 @@ public class Shard extends RaftActor {
         private ShardIdentifier id;
         private Map<String, String> peerAddresses = Collections.emptyMap();
         private DatastoreContext datastoreContext;
-        private SchemaContextProvider schemaContextProvider;
+        private EffectiveModelContextProvider schemaContextProvider;
         private DatastoreSnapshot.ShardSnapshot restoreFromSnapshot;
         private DataTree dataTree;
 
@@ -1128,7 +1147,7 @@ public class Shard extends RaftActor {
             return self();
         }
 
-        public T schemaContextProvider(final SchemaContextProvider newSchemaContextProvider) {
+        public T schemaContextProvider(final EffectiveModelContextProvider newSchemaContextProvider) {
             checkSealed();
             this.schemaContextProvider = requireNonNull(newSchemaContextProvider);
             return self();
@@ -1158,8 +1177,8 @@ public class Shard extends RaftActor {
             return datastoreContext;
         }
 
-        public SchemaContext getSchemaContext() {
-            return Verify.verifyNotNull(schemaContextProvider.getSchemaContext());
+        public EffectiveModelContext getSchemaContext() {
+            return Verify.verifyNotNull(schemaContextProvider.getEffectiveModelContext());
         }
 
         public DatastoreSnapshot.ShardSnapshot getRestoreFromSnapshot() {
@@ -1201,7 +1220,7 @@ public class Shard extends RaftActor {
             this(Shard.class);
         }
 
-        Builder(Class<? extends Shard> shardClass) {
+        Builder(final Class<? extends Shard> shardClass) {
             super(shardClass);
         }
     }