8332a926057ee5f4ed0c6aad91ae19b2503ce6d5
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / EOSClusterSingletonServiceProvider.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.singleton.dom.impl;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import javax.annotation.PreDestroy;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
29 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
30 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
31 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
32 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
34 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
35 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
36 import org.opendaylight.yangtools.concepts.Registration;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
38 import org.osgi.service.component.annotations.Activate;
39 import org.osgi.service.component.annotations.Component;
40 import org.osgi.service.component.annotations.Deactivate;
41 import org.osgi.service.component.annotations.Reference;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Implementation of {@link ClusterSingletonServiceProvider} working on top a {@link DOMEntityOwnershipService}.
47  */
48 @Singleton
49 @Component(service = ClusterSingletonServiceProvider.class)
50 public final class EOSClusterSingletonServiceProvider
51         implements ClusterSingletonServiceProvider, DOMEntityOwnershipListener, AutoCloseable {
52     private static final Logger LOG = LoggerFactory.getLogger(EOSClusterSingletonServiceProvider.class);
53
54     @VisibleForTesting
55     static final @NonNull String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
56     @VisibleForTesting
57     static final @NonNull String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
58
59     private final ConcurrentMap<String, ServiceGroup> serviceGroupMap = new ConcurrentHashMap<>();
60     private final DOMEntityOwnershipService entityOwnershipService;
61
62     /* EOS Entity Listeners Registration */
63     private Registration serviceEntityListenerReg;
64     private Registration asyncCloseEntityListenerReg;
65
66     @Inject
67     @Activate
68     public EOSClusterSingletonServiceProvider(@Reference final DOMEntityOwnershipService entityOwnershipService) {
69         this.entityOwnershipService = requireNonNull(entityOwnershipService);
70         serviceEntityListenerReg = entityOwnershipService.registerListener(SERVICE_ENTITY_TYPE, this);
71         asyncCloseEntityListenerReg = entityOwnershipService.registerListener(CLOSE_SERVICE_ENTITY_TYPE, this);
72         LOG.info("Cluster Singleton Service started");
73     }
74
75     @PreDestroy
76     @Deactivate
77     @Override
78     public void close() throws ExecutionException, InterruptedException {
79         final Registration reg;
80         final ListenableFuture<?> future;
81         synchronized (this) {
82             if (serviceEntityListenerReg == null) {
83                 // Idempotent
84                 return;
85             }
86
87             LOG.info("Cluster Singleton Service stopping");
88             reg = serviceEntityListenerReg;
89             serviceEntityListenerReg = null;
90             future = Futures.allAsList(serviceGroupMap.values().stream()
91                 .map(ServiceGroup::closeClusterSingletonGroup)
92                 .toList());
93         }
94
95         try {
96             LOG.debug("Waiting for service groups to stop");
97             future.get();
98         } finally {
99             reg.close();
100             asyncCloseEntityListenerReg.close();
101             asyncCloseEntityListenerReg = null;
102             serviceGroupMap.clear();
103             LOG.info("Cluster Singleton Service stopped");
104         }
105     }
106
107     @Override
108     public synchronized Registration registerClusterSingletonService(final ClusterSingletonService service) {
109         final var serviceIdentifier = requireNonNull(service.getIdentifier());
110         if (serviceEntityListenerReg == null) {
111             throw new IllegalStateException(this + "is closed");
112         }
113
114         LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
115
116         final var identifierValue = serviceIdentifier.value();
117         final ServiceGroup serviceGroup;
118         final var existing = serviceGroupMap.get(identifierValue);
119         if (existing == null) {
120             serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
121             serviceGroupMap.put(identifierValue, serviceGroup);
122
123             try {
124                 initializeOrRemoveGroup(serviceGroup);
125             } catch (CandidateAlreadyRegisteredException e) {
126                 throw new IllegalArgumentException("Service group already registered", e);
127             }
128         } else {
129             serviceGroup = existing;
130         }
131
132         final var reg = new ServiceRegistration(service) {
133             @Override
134             protected void removeRegistration() {
135                 // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
136                 // shutdown of the group and user registering it again.
137                 EOSClusterSingletonServiceProvider.this.removeRegistration(serviceIdentifier, this);
138             }
139         };
140
141         serviceGroup.registerService(reg);
142         return reg;
143     }
144
145     private ServiceGroup createGroup(final ServiceGroupIdentifier identifier,
146             final List<ServiceRegistration> services) {
147         return new ActiveServiceGroup(identifier, entityOwnershipService,
148             createEntity(SERVICE_ENTITY_TYPE, identifier), createEntity(CLOSE_SERVICE_ENTITY_TYPE, identifier),
149             services);
150     }
151
152     @Holding("this")
153     private void initializeOrRemoveGroup(final ServiceGroup group) throws CandidateAlreadyRegisteredException {
154         try {
155             group.initialize();
156         } catch (CandidateAlreadyRegisteredException e) {
157             serviceGroupMap.remove(group.getIdentifier(), group);
158             throw e;
159         }
160     }
161
162     private void removeRegistration(final ServiceGroupIdentifier serviceIdentifier, final ServiceRegistration reg) {
163         final PlaceholderServiceGroup placeHolder;
164         final ListenableFuture<?> future;
165         synchronized (this) {
166             final var lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier.value()));
167             future = lookup.unregisterService(reg);
168             if (future == null) {
169                 return;
170             }
171
172             // Close the group and replace it with a placeholder
173             LOG.debug("Closing service group {}", serviceIdentifier);
174             placeHolder = new PlaceholderServiceGroup(lookup, future);
175
176             final var identifier = reg.getInstance().getIdentifier().value();
177             verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
178             LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
179
180             lookup.closeClusterSingletonGroup();
181         }
182
183         future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
184     }
185
186     private synchronized void finishShutdown(final PlaceholderServiceGroup placeHolder) {
187         final var identifier = placeHolder.getIdentifier();
188         LOG.debug("Service group {} closed", identifier);
189
190         final var services = placeHolder.getServices();
191         if (services.isEmpty()) {
192             // No services, we are all done
193             if (serviceGroupMap.remove(identifier.value(), placeHolder)) {
194                 LOG.debug("Service group {} removed", placeHolder);
195             } else {
196                 LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier.value()));
197             }
198             return;
199         }
200
201         // Placeholder is being retired, we are reusing its services as the seed for the group.
202         final var group = createGroup(identifier, services);
203         verify(serviceGroupMap.replace(identifier.value(), placeHolder, group));
204         placeHolder.setSuccessor(group);
205         LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
206
207         try {
208             initializeOrRemoveGroup(group);
209         } catch (CandidateAlreadyRegisteredException e) {
210             LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
211         }
212     }
213
214     @Override
215     public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
216             final boolean inJeopardy) {
217         LOG.debug("Ownership change for ClusterSingletonService Provider on {} {} inJeopardy={}", entity, change,
218             inJeopardy);
219
220         final var serviceIdentifier = getServiceIdentifierFromEntity(entity);
221         final var serviceHolder = serviceGroupMap.get(serviceIdentifier);
222         if (serviceHolder != null) {
223             serviceHolder.ownershipChanged(entity, change, inJeopardy);
224         } else {
225             LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
226         }
227     }
228
229     @VisibleForTesting
230     static @NonNull String getServiceIdentifierFromEntity(final DOMEntity entity) {
231         final var yii = entity.getIdentifier();
232         final var niiwp = (NodeIdentifierWithPredicates) yii.getLastPathArgument();
233         return niiwp.values().iterator().next().toString();
234     }
235
236     /**
237      * Creates an extended {@link DOMEntity} instance.
238      *
239      * @param entityType the type of the entity
240      * @param sgi the identifier of the entity
241      * @return instance of Entity extended GenericEntity type
242      */
243     @VisibleForTesting
244     static DOMEntity createEntity(final String entityType, final ServiceGroupIdentifier sgi) {
245         return new DOMEntity(entityType, sgi.value());
246     }
247 }