Clean up mdsal-singleton-dom-impl
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / EOSClusterSingletonServiceProvider.java
diff --git a/singleton-service/mdsal-singleton-dom-impl/src/main/java/org/opendaylight/mdsal/singleton/dom/impl/EOSClusterSingletonServiceProvider.java b/singleton-service/mdsal-singleton-dom-impl/src/main/java/org/opendaylight/mdsal/singleton/dom/impl/EOSClusterSingletonServiceProvider.java
new file mode 100644 (file)
index 0000000..1c9469f
--- /dev/null
@@ -0,0 +1,243 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.singleton.dom.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ClusterSingletonServiceProvider} working on top a {@link DOMEntityOwnershipService}.
+ */
+@Singleton
+@Component(service = ClusterSingletonServiceProvider.class)
+public final class EOSClusterSingletonServiceProvider
+        implements ClusterSingletonServiceProvider, DOMEntityOwnershipListener, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(EOSClusterSingletonServiceProvider.class);
+
+    @VisibleForTesting
+    static final @NonNull String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
+    @VisibleForTesting
+    static final @NonNull String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
+
+    private final ConcurrentMap<String, ClusterSingletonServiceGroup> serviceGroupMap = new ConcurrentHashMap<>();
+    private final DOMEntityOwnershipService entityOwnershipService;
+
+    /* EOS Entity Listeners Registration */
+    private Registration serviceEntityListenerReg;
+    private Registration asyncCloseEntityListenerReg;
+
+    @Inject
+    @Activate
+    public EOSClusterSingletonServiceProvider(@Reference final DOMEntityOwnershipService entityOwnershipService) {
+        this.entityOwnershipService = requireNonNull(entityOwnershipService);
+        serviceEntityListenerReg = entityOwnershipService.registerListener(SERVICE_ENTITY_TYPE, this);
+        asyncCloseEntityListenerReg = entityOwnershipService.registerListener(CLOSE_SERVICE_ENTITY_TYPE, this);
+        LOG.info("Cluster Singleton Service started");
+    }
+
+    @PreDestroy
+    @Deactivate
+    @Override
+    public synchronized void close() throws ExecutionException, InterruptedException {
+        if (serviceEntityListenerReg == null) {
+            // Idempotent
+            return;
+        }
+
+        LOG.info("Cluster Singleton Service stopping");
+        serviceEntityListenerReg.close();
+        serviceEntityListenerReg = null;
+
+        final var future = Futures.allAsList(serviceGroupMap.values().stream()
+            .map(ClusterSingletonServiceGroup::closeClusterSingletonGroup)
+            .toList());
+        try {
+            LOG.debug("Waiting for service groups to stop");
+            future.get();
+        } finally {
+            asyncCloseEntityListenerReg.close();
+            asyncCloseEntityListenerReg = null;
+            serviceGroupMap.clear();
+        }
+
+        LOG.info("Cluster Singleton Service stopped");
+    }
+
+    @Override
+    public synchronized ClusterSingletonServiceRegistration registerClusterSingletonService(
+            final ClusterSingletonService service) {
+        LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
+
+        final String serviceIdentifier = service.getIdentifier().getName();
+        checkArgument(!Strings.isNullOrEmpty(serviceIdentifier),
+            "ClusterSingletonService identifier may not be null nor empty");
+
+        final ClusterSingletonServiceGroup serviceGroup;
+        final var existing = serviceGroupMap.get(serviceIdentifier);
+        if (existing == null) {
+            serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
+            serviceGroupMap.put(serviceIdentifier, serviceGroup);
+
+            try {
+                initializeOrRemoveGroup(serviceGroup);
+            } catch (CandidateAlreadyRegisteredException e) {
+                throw new IllegalArgumentException("Service group already registered", e);
+            }
+        } else {
+            serviceGroup = existing;
+        }
+
+        final var reg = new AbstractClusterSingletonServiceRegistration(service) {
+            @Override
+            protected void removeRegistration() {
+                // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
+                // shutdown of the group and user registering it again.
+                EOSClusterSingletonServiceProvider.this.removeRegistration(serviceIdentifier, this);
+            }
+        };
+
+        serviceGroup.registerService(reg);
+        return reg;
+    }
+
+    private ClusterSingletonServiceGroup createGroup(final String serviceIdentifier,
+            final List<ClusterSingletonServiceRegistration> services) {
+        return new ClusterSingletonServiceGroupImpl(serviceIdentifier, entityOwnershipService,
+            createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
+            createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
+    }
+
+    private void initializeOrRemoveGroup(final ClusterSingletonServiceGroup group)
+            throws CandidateAlreadyRegisteredException {
+        try {
+            group.initialize();
+        } catch (CandidateAlreadyRegisteredException e) {
+            serviceGroupMap.remove(group.getIdentifier(), group);
+            throw e;
+        }
+    }
+
+    private void removeRegistration(final String serviceIdentifier, final ClusterSingletonServiceRegistration reg) {
+        final PlaceholderGroup placeHolder;
+        final ListenableFuture<?> future;
+        synchronized (this) {
+            final var lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
+            future = lookup.unregisterService(reg);
+            if (future == null) {
+                return;
+            }
+
+            // Close the group and replace it with a placeholder
+            LOG.debug("Closing service group {}", serviceIdentifier);
+            placeHolder = new PlaceholderGroup(lookup, future);
+
+            final String identifier = reg.getInstance().getIdentifier().getName();
+            verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
+            LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
+
+            lookup.closeClusterSingletonGroup();
+        }
+
+        future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
+    }
+
+    private synchronized void finishShutdown(final PlaceholderGroup placeHolder) {
+        final var identifier = placeHolder.getIdentifier();
+        LOG.debug("Service group {} closed", identifier);
+
+        final var services = placeHolder.getServices();
+        if (services.isEmpty()) {
+            // No services, we are all done
+            if (serviceGroupMap.remove(identifier, placeHolder)) {
+                LOG.debug("Service group {} removed", placeHolder);
+            } else {
+                LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
+            }
+            return;
+        }
+
+        // Placeholder is being retired, we are reusing its services as the seed for the group.
+        final var group = createGroup(identifier, services);
+        verify(serviceGroupMap.replace(identifier, placeHolder, group));
+        placeHolder.setSuccessor(group);
+        LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
+
+        try {
+            initializeOrRemoveGroup(group);
+        } catch (CandidateAlreadyRegisteredException e) {
+            LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
+        }
+    }
+
+    @Override
+    public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
+            final boolean inJeopardy) {
+        LOG.debug("Ownership change for ClusterSingletonService Provider on {} {} inJeopardy={}", entity, change,
+            inJeopardy);
+
+        final var serviceIdentifier = getServiceIdentifierFromEntity(entity);
+        final var serviceHolder = serviceGroupMap.get(serviceIdentifier);
+        if (serviceHolder != null) {
+            serviceHolder.ownershipChanged(entity, change, inJeopardy);
+        } else {
+            LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
+        }
+    }
+
+    @VisibleForTesting
+    static String getServiceIdentifierFromEntity(final DOMEntity entity) {
+        final var yii = entity.getIdentifier();
+        final var niiwp = (NodeIdentifierWithPredicates) yii.getLastPathArgument();
+        return niiwp.values().iterator().next().toString();
+    }
+
+    /**
+     * Creates an extended {@link DOMEntity} instance.
+     *
+     * @param entityType the type of the entity
+     * @param entityIdentifier the identifier of the entity
+     * @return instance of Entity extended GenericEntity type
+     */
+    @VisibleForTesting
+    static DOMEntity createEntity(final String entityType, final String entityIdentifier) {
+        return new DOMEntity(entityType, entityIdentifier);
+    }
+}