Eliminate EntityOwnershipChange
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / AbstractClusterSingletonServiceProviderImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.singleton.dom.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Strings;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
27 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
28 import org.opendaylight.mdsal.eos.common.api.GenericEntity;
29 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
30 import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService;
31 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
34 import org.opendaylight.yangtools.concepts.HierarchicalIdentifier;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Abstract class {@link AbstractClusterSingletonServiceProviderImpl} represents implementations of
41  * {@link ClusterSingletonServiceProvider} and it implements {@link GenericEntityOwnershipListener}
42  * for providing OwnershipChange for all registered {@link ClusterSingletonServiceGroup} entity
43  * candidate.
44  *
45  * @param <P> the instance identifier path type
46  * @param <E> the GenericEntity type
47  * @param <G> the GenericEntityOwnershipListener type
48  * @param <S> the GenericEntityOwnershipService type
49  */
50 public abstract class AbstractClusterSingletonServiceProviderImpl<P extends HierarchicalIdentifier<P>,
51         E extends GenericEntity<P>, G extends GenericEntityOwnershipListener<E>,
52         S extends GenericEntityOwnershipService<E, G>>
53         implements ClusterSingletonServiceProvider, GenericEntityOwnershipListener<E> {
54     private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterSingletonServiceProviderImpl.class);
55
56     @VisibleForTesting
57     static final @NonNull String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
58     @VisibleForTesting
59     static final @NonNull String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
60
61     private final S entityOwnershipService;
62     private final Map<String, ClusterSingletonServiceGroup<P, E>> serviceGroupMap = new ConcurrentHashMap<>();
63
64     /* EOS Entity Listeners Registration */
65     private Registration serviceEntityListenerReg;
66     private Registration asyncCloseEntityListenerReg;
67
68     /**
69      * Class constructor.
70      *
71      * @param entityOwnershipService relevant EOS
72      */
73     protected AbstractClusterSingletonServiceProviderImpl(final @NonNull S entityOwnershipService) {
74         this.entityOwnershipService = requireNonNull(entityOwnershipService);
75     }
76
77     /**
78      * This method must be called once on startup to initialize this provider.
79      */
80     public final void initializeProvider() {
81         LOG.debug("Initialization method for ClusterSingletonService Provider {}", this);
82         serviceEntityListenerReg = registerListener(SERVICE_ENTITY_TYPE, entityOwnershipService);
83         asyncCloseEntityListenerReg = registerListener(CLOSE_SERVICE_ENTITY_TYPE, entityOwnershipService);
84     }
85
86     @Override
87     public final synchronized ClusterSingletonServiceRegistration registerClusterSingletonService(
88             final ClusterSingletonService service) {
89         LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
90
91         final String serviceIdentifier = service.getIdentifier().getName();
92         checkArgument(!Strings.isNullOrEmpty(serviceIdentifier),
93             "ClusterSingletonService identifier may not be null nor empty");
94
95         final ClusterSingletonServiceGroup<P, E> serviceGroup;
96         final var existing = serviceGroupMap.get(serviceIdentifier);
97         if (existing == null) {
98             serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
99             serviceGroupMap.put(serviceIdentifier, serviceGroup);
100
101             try {
102                 initializeOrRemoveGroup(serviceGroup);
103             } catch (CandidateAlreadyRegisteredException e) {
104                 throw new IllegalArgumentException("Service group already registered", e);
105             }
106         } else {
107             serviceGroup = existing;
108         }
109
110         final var reg =  new AbstractClusterSingletonServiceRegistration(service) {
111             @Override
112             protected void removeRegistration() {
113                 // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
114                 // shutdown of the group and user registering it again.
115                 AbstractClusterSingletonServiceProviderImpl.this.removeRegistration(serviceIdentifier, this);
116             }
117         };
118
119         serviceGroup.registerService(reg);
120         return reg;
121     }
122
123     private ClusterSingletonServiceGroup<P, E> createGroup(final String serviceIdentifier,
124             final List<ClusterSingletonServiceRegistration> services) {
125         return new ClusterSingletonServiceGroupImpl<>(serviceIdentifier, entityOwnershipService,
126             createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
127             createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
128     }
129
130     private void initializeOrRemoveGroup(final ClusterSingletonServiceGroup<P, E> group)
131             throws CandidateAlreadyRegisteredException {
132         try {
133             group.initialize();
134         } catch (CandidateAlreadyRegisteredException e) {
135             serviceGroupMap.remove(group.getIdentifier(), group);
136             throw e;
137         }
138     }
139
140     void removeRegistration(final String serviceIdentifier, final ClusterSingletonServiceRegistration reg) {
141         final PlaceholderGroup<P, E> placeHolder;
142         final ListenableFuture<?> future;
143         synchronized (this) {
144             final var lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
145             future = lookup.unregisterService(reg);
146             if (future == null) {
147                 return;
148             }
149
150             // Close the group and replace it with a placeholder
151             LOG.debug("Closing service group {}", serviceIdentifier);
152             placeHolder = new PlaceholderGroup<>(lookup, future);
153
154             final String identifier = reg.getInstance().getIdentifier().getName();
155             verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
156             LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
157
158             lookup.closeClusterSingletonGroup();
159         }
160
161         future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
162     }
163
164     synchronized void finishShutdown(final PlaceholderGroup<P, E> placeHolder) {
165         final String identifier = placeHolder.getIdentifier();
166         LOG.debug("Service group {} closed", identifier);
167
168         final var services = placeHolder.getServices();
169         if (services.isEmpty()) {
170             // No services, we are all done
171             if (serviceGroupMap.remove(identifier, placeHolder)) {
172                 LOG.debug("Service group {} removed", placeHolder);
173             } else {
174                 LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
175             }
176             return;
177         }
178
179         // Placeholder is being retired, we are reusing its services as the seed for the group.
180         final var group = createGroup(identifier, services);
181         verify(serviceGroupMap.replace(identifier, placeHolder, group));
182         placeHolder.setSuccessor(group);
183         LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
184
185         try {
186             initializeOrRemoveGroup(group);
187         } catch (CandidateAlreadyRegisteredException e) {
188             LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
189         }
190     }
191
192     @Override
193     public final void close() {
194         LOG.debug("Close method for ClusterSingletonService Provider {}", this);
195
196         if (serviceEntityListenerReg != null) {
197             serviceEntityListenerReg.close();
198             serviceEntityListenerReg = null;
199         }
200
201         final var futures = serviceGroupMap.values().stream()
202             .map(ClusterSingletonServiceGroup::closeClusterSingletonGroup)
203             .toList();
204         final var future = Futures.allAsList(futures);
205         Futures.addCallback(future, new FutureCallback<>() {
206             @Override
207             public void onSuccess(final List<Object> result) {
208                 cleanup();
209             }
210
211             @Override
212             public void onFailure(final Throwable throwable) {
213                 LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
214                     AbstractClusterSingletonServiceProviderImpl.this, throwable);
215                 cleanup();
216             }
217         }, MoreExecutors.directExecutor());
218     }
219
220     @Override
221     public final void ownershipChanged(final E entity, final EntityOwnershipStateChange change,
222             final boolean inJeopardy) {
223         LOG.debug("Ownership change for ClusterSingletonService Provider on {} {} inJeopardy={}", entity, change,
224             inJeopardy);
225         final var serviceIdentifier = getServiceIdentifierFromEntity(entity);
226         final var serviceHolder = serviceGroupMap.get(serviceIdentifier);
227         if (serviceHolder != null) {
228             serviceHolder.ownershipChanged(entity, change, inJeopardy);
229         } else {
230             LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
231         }
232     }
233
234     /**
235      * Method implementation registers the listener.
236      *
237      * @param entityType the type of the entity
238      * @param entityOwnershipServiceInst - EOS type
239      * @return a {@link Registration}
240      */
241     protected abstract Registration registerListener(String entityType, S entityOwnershipServiceInst);
242
243     /**
244      * Creates an extended {@link GenericEntity} instance.
245      *
246      * @param entityType the type of the entity
247      * @param entityIdentifier the identifier of the entity
248      * @return instance of Entity extended GenericEntity type
249      */
250     protected abstract E createEntity(String entityType, String entityIdentifier);
251
252     /**
253      * Method is responsible for parsing ServiceGroupIdentifier from E entity.
254      *
255      * @param entity instance of GenericEntity type
256      * @return ServiceGroupIdentifier parsed from entity key value.
257      */
258     protected abstract String getServiceIdentifierFromEntity(E entity);
259
260     /**
261      * Method is called async. from close method in end of Provider lifecycle.
262      */
263     final void cleanup() {
264         LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this);
265         if (asyncCloseEntityListenerReg != null) {
266             asyncCloseEntityListenerReg.close();
267             asyncCloseEntityListenerReg = null;
268         }
269         serviceGroupMap.clear();
270     }
271 }