import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.Endpoints;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.EndpointsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterEndpointInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.RegisterL3PrefixEndpointInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.SetEndpointGroupConditionsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.UnregisterEndpointInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.UnsetEndpointGroupConditionsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Builder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Key;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3Prefix;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3PrefixBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.endpoints.EndpointL3PrefixKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.has.endpoint.group.conditions.EndpointGroupCondition;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.has.endpoint.group.conditions.EndpointGroupConditionKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.unregister.endpoint.input.L2;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groupbasedpolicy.endpoint.rev140421.unregister.endpoint.input.L3;
+import org.opendaylight.yangtools.yang.binding.Augmentation;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import com.google.common.util.concurrent.ListenableFuture;
/**
- * Endpoint registry provides a scalable store for accessing and
- * updating information about endpoints.
- * @author readamsO
+ * Endpoint registry provides a scalable store for accessing and updating
+ * information about endpoints.
*/
public class EndpointRpcRegistry implements EndpointService {
- private static final Logger LOG =
+ private static final Logger LOG =
LoggerFactory.getLogger(EndpointRpcRegistry.class);
private final DataBroker dataProvider;
private final static ConcurrentMap<String, EpRendererAugmentation> registeredRenderers = new ConcurrentHashMap<String, EpRendererAugmentation>();
-
/**
* This method registers a renderer for endpoint RPC API. This method
* ensures single RPC registration for all renderers since a single RPC
* - the dataProvider
* @param rpcRegistry
* - the rpcRegistry
- * @param executor
- * - thread pool executor
* @param epRendererAugmentation
- * - specific implementation RPC augmentation, if any. Otherwise NULL
+ * - specific implementation RPC augmentation, if any. Otherwise
+ * NULL
*/
public static void register(DataBroker dataProvider,
- RpcProviderRegistry rpcRegistry, ScheduledExecutorService executor,
+ RpcProviderRegistry rpcRegistry,
EpRendererAugmentation epRendererAugmentation) {
- if (dataProvider == null || rpcRegistry == null || executor == null) {
+ if (dataProvider == null || rpcRegistry == null) {
if (epRendererAugmentation != null) {
- LOG.warn("Couldn't register class {} for endpoint RPC because of missing required info");
+ LOG.warn("Couldn't register class {} for endpoint RPC because of missing required info", epRendererAugmentation);
}
return;
}
if (endpointRpcRegistry == null) {
synchronized (EndpointRpcRegistry.class) {
if (endpointRpcRegistry == null) {
+ ScheduledExecutorService executor = Executors
+ .newScheduledThreadPool(Math
+ .max(Runtime.getRuntime()
+ .availableProcessors() * 3, 10));
endpointRpcRegistry = new EndpointRpcRegistry(dataProvider,
rpcRegistry, executor);
}
/**
*
- * @param regImp
+ * @param regImp the endpoint augmentation
* @throws Exception
*/
public static void unregister(EpRendererAugmentation regImp)
if (registeredRenderers.isEmpty()
&& endpointRpcRegistry != null) {
endpointRpcRegistry.rpcRegistration.close();
+ endpointRpcRegistry.executor.shutdown();
endpointRpcRegistry = null;
}
}
}
}
+ public static Class<?> getAugmentationContextType(
+ final Augmentation<?> augmentation) {
+ if (augmentation == null) {
+ return null;
+ }
+ final Class<?>[] augmentationInterfaces = augmentation.getClass()
+ .getInterfaces();
+ if (augmentationInterfaces.length == 1) {
+ return augmentationInterfaces[0];
+ }
+ /*
+ * if here, then the way YANG tools generate augmentation code has
+ * changed, hence augmentation classes are not implemented by single
+ * interface anymore. This is very unlikely to happen, but if it did, we
+ * need to know about it in order to update this method.
+ */
+ LOG.error(
+ "YANG Generated Code has Changed -- augmentation object {} is NOT implemented by one interface anymore",
+ augmentation);
+ return null;
+ }
+
/**
* Constructor
*
- * @param dataProvider
- * @param rpcRegistry
- * @param executor
+ * @param dataProvider the {@link DataBroker}
+ * @param rpcRegistry the {@link RpcProviderRegistry}
+ * @param executor the {@link ScheduledExecutorService}
*/
private EndpointRpcRegistry(DataBroker dataProvider,
- RpcProviderRegistry rpcRegistry,
- ScheduledExecutorService executor) {
+ RpcProviderRegistry rpcRegistry,
+ ScheduledExecutorService executor) {
this.dataProvider = dataProvider;
this.executor = executor;
this.rpcRegistry = rpcRegistry;
LOG.debug("Added RPC Implementation Correctly");
} else
rpcRegistration = null;
-
+
if (dataProvider != null) {
- InstanceIdentifier<Endpoints> iid =
+ InstanceIdentifier<Endpoints> iid =
InstanceIdentifier.builder(Endpoints.class).build();
WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
- t.put(LogicalDatastoreType.OPERATIONAL,
- iid, new EndpointsBuilder().build());
+ t.put(LogicalDatastoreType.OPERATIONAL,
+ iid, new EndpointsBuilder().build());
CheckedFuture<Void, TransactionCommitFailedException> f = t.submit();
Futures.addCallback(f, new FutureCallback<Void>() {
@Override
@Override
public void onSuccess(Void result) {
-
+
}
});
}
- // XXX TODO - age out endpoint data and remove
+ // TODO Be alagalah - age out endpoint data and remove
// endpoint group/condition mappings with no conditions
}
-
+
/**
- * Construct an endpoint with the appropriate augmentations from the
- * endpoint input. Each concrete implementation can provides its specifics earlier.
- * @param input the input object
+ * Construct an endpoint with the appropriate augmentations from the
+ * endpoint input. Each concrete implementation can provides its specifics
+ * earlier.
+ *
+ * @param input
+ * the input object
*/
private EndpointBuilder buildEndpoint(RegisterEndpointInput input) {
EndpointBuilder eb = new EndpointBuilder(input);
for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
.entrySet()) {
try {
- entry.getValue().buildEndpointAugmentation(eb, input);
- } catch (Throwable t) {
+ Augmentation<Endpoint> augmentation = entry.getValue()
+ .buildEndpointAugmentation(input);
+ if (augmentation != null) {
+ @SuppressWarnings("unchecked")
+ Class<? extends Augmentation<Endpoint>> augmentationType = (Class<? extends Augmentation<Endpoint>>) getAugmentationContextType(augmentation);
+ eb.addAugmentation(augmentationType, augmentation);
+ }
+ } catch (Exception e) {
LOG.warn("Endpoint Augmentation error while processing "
- + entry.getKey() + ". Reason: ", t);
+ + entry.getKey() + ". Reason: ", e);
}
}
return eb;
}
/**
- * Construct an L3 endpoint with the appropriate augmentations from the
- * endpoint input. Each concrete implementation can provides its specifics earlier.
- * @param input the input object
+ * Construct an L3 endpoint with the appropriate augmentations from the
+ * endpoint input. Each concrete implementation can provides its specifics
+ * earlier.
+ *
+ * @param input
+ * the input object
*/
private EndpointL3Builder buildEndpointL3(RegisterEndpointInput input) {
EndpointL3Builder eb = new EndpointL3Builder(input);
for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
.entrySet()) {
try {
- entry.getValue().buildEndpointL3Augmentation(eb, input);
- } catch (Throwable t) {
+ Augmentation<EndpointL3> augmentation = entry.getValue()
+ .buildEndpointL3Augmentation(input);
+ if (augmentation != null) {
+ @SuppressWarnings("unchecked")
+ Class<? extends Augmentation<EndpointL3>> augmentationType = (Class<? extends Augmentation<EndpointL3>>) getAugmentationContextType(augmentation);
+ eb.addAugmentation(augmentationType, augmentation);
+ }
+ } catch (Exception e) {
LOG.warn("L3 endpoint Augmentation error while processing "
- + entry.getKey() + ". Reason: ", t);
+ + entry.getKey() + ". Reason: ", e);
+ }
+ }
+ return eb;
+ }
+
+ /**
+ * Construct an L3 endpoint with the appropriate augmentations from the
+ * endpoint input. Each concrete implementation can provides its specifics
+ * earlier.
+ *
+ * @param input
+ * the input object
+ */
+ private EndpointL3PrefixBuilder buildL3PrefixEndpoint(RegisterL3PrefixEndpointInput input) {
+ EndpointL3PrefixBuilder eb = new EndpointL3PrefixBuilder(input);
+ for (Entry<String, EpRendererAugmentation> entry : registeredRenderers
+ .entrySet()) {
+ try {
+ entry.getValue().buildL3PrefixEndpointAugmentation(eb, input);
+ } catch (Exception e) {
+ LOG.warn("L3 endpoint Augmentation error while processing "
+ + entry.getKey() + ". Reason: ", e);
}
}
return eb;
@Override
public Future<RpcResult<Void>>
- registerEndpoint(RegisterEndpointInput input) {
+ registerEndpoint(RegisterEndpointInput input) {
long timestamp = System.currentTimeMillis();
-
+
+ // TODO: Replicate RPC feedback implemented in L3Prefix register for
+ // unmet requirements.
WriteTransaction t = dataProvider.newWriteOnlyTransaction();
if (input.getL2Context() != null &&
- input.getMacAddress() != null) {
+ input.getMacAddress() != null) {
Endpoint ep = buildEndpoint(input)
.setTimestamp(timestamp)
.build();
- EndpointKey key =
+ EndpointKey key =
new EndpointKey(ep.getL2Context(), ep.getMacAddress());
- InstanceIdentifier<Endpoint> iid =
+ InstanceIdentifier<Endpoint> iid =
InstanceIdentifier.builder(Endpoints.class)
- .child(Endpoint.class, key)
- .build();
- t.put(LogicalDatastoreType.OPERATIONAL, iid, ep);
+ .child(Endpoint.class, key)
+ .build();
+ t.put(LogicalDatastoreType.OPERATIONAL, iid, ep, true);
}
if (input.getL3Address() != null) {
for (L3Address l3addr : input.getL3Address()) {
- EndpointL3Key key3 = new EndpointL3Key(l3addr.getIpAddress(),
- l3addr.getL3Context());
+ EndpointL3Key key3 = new EndpointL3Key(l3addr.getIpAddress(),
+ l3addr.getL3Context());
EndpointL3 ep3 = buildEndpointL3(input)
- .setIpAddress(key3.getIpAddress())
- .setL3Context(key3.getL3Context())
- .setTimestamp(timestamp)
- .build();
- InstanceIdentifier<EndpointL3> iid_l3 =
+ .setIpAddress(key3.getIpAddress())
+ .setL3Context(key3.getL3Context())
+ .setTimestamp(timestamp)
+ .build();
+ InstanceIdentifier<EndpointL3> iid_l3 =
InstanceIdentifier.builder(Endpoints.class)
- .child(EndpointL3.class, key3)
- .build();
- t.put(LogicalDatastoreType.OPERATIONAL, iid_l3, ep3);
+ .child(EndpointL3.class, key3)
+ .build();
+ t.put(LogicalDatastoreType.OPERATIONAL, iid_l3, ep3, true);
}
}
ListenableFuture<Void> r = t.submit();
return Futures.transform(r, futureTrans, executor);
}
+ @Override
+ public Future<RpcResult<Void>> registerL3PrefixEndpoint(RegisterL3PrefixEndpointInput input) {
+
+ if (input.getL3Context() == null) {
+ return Futures.immediateFuture(RpcResultBuilder.<Void> failed()
+ .withError(ErrorType.RPC, "L3 Prefix Endpoint must have L3Context.").build());
+ }
+ if (input.getIpPrefix() == null) {
+ return Futures.immediateFuture(RpcResultBuilder.<Void> failed()
+ .withError(ErrorType.RPC, "L3 Prefix Endpoint must have ip-prefix.").build());
+ }
+
+ if (input.getTenant() == null) {
+ return Futures.immediateFuture(RpcResultBuilder.<Void> failed()
+ .withError(ErrorType.RPC, "L3 Prefix Endpoint must have tenant.").build());
+ }
+
+ WriteTransaction t = dataProvider.newWriteOnlyTransaction();
+
+ long timestamp = System.currentTimeMillis();
+
+ // TODO: Convert IPPrefix into it's IPv4/IPv6 canonical form.
+ // See org.apache.commons.net.util.SubnetUtils.SubnetInfo
+
+ EndpointL3PrefixKey epL3PrefixKey = new EndpointL3PrefixKey(input.getIpPrefix(), input.getL3Context());
+
+ EndpointL3Prefix epL3Prefix = buildL3PrefixEndpoint(input).setTimestamp(timestamp).build();
+ InstanceIdentifier<EndpointL3Prefix> iid_l3prefix =
+ InstanceIdentifier.builder(Endpoints.class)
+ .child(EndpointL3Prefix.class, epL3PrefixKey)
+ .build();
+ t.put(LogicalDatastoreType.OPERATIONAL, iid_l3prefix, epL3Prefix);
+
+ ListenableFuture<Void> r = t.submit();
+ return Futures.transform(r, futureTrans, executor);
+ }
+
@Override
public Future<RpcResult<Void>>
- unregisterEndpoint(UnregisterEndpointInput input) {
+ unregisterEndpoint(UnregisterEndpointInput input) {
WriteTransaction t = dataProvider.newWriteOnlyTransaction();
if (input.getL2() != null) {
for (L2 l2a : input.getL2()) {
- EndpointKey key =
- new EndpointKey(l2a.getL2Context(),
- l2a.getMacAddress());
- InstanceIdentifier<Endpoint> iid =
+ EndpointKey key =
+ new EndpointKey(l2a.getL2Context(),
+ l2a.getMacAddress());
+ InstanceIdentifier<Endpoint> iid =
InstanceIdentifier.builder(Endpoints.class)
- .child(Endpoint.class, key).build();
+ .child(Endpoint.class, key).build();
t.delete(LogicalDatastoreType.OPERATIONAL, iid);
}
}
if (input.getL3() != null) {
for (L3 l3addr : input.getL3()) {
- EndpointL3Key key3 =
- new EndpointL3Key(l3addr.getIpAddress(),
- l3addr.getL3Context());
- InstanceIdentifier<EndpointL3> iid_l3 =
+ EndpointL3Key key3 =
+ new EndpointL3Key(l3addr.getIpAddress(),
+ l3addr.getL3Context());
+ InstanceIdentifier<EndpointL3> iid_l3 =
InstanceIdentifier.builder(Endpoints.class)
- .child(EndpointL3.class, key3)
- .build();
+ .child(EndpointL3.class, key3)
+ .build();
t.delete(LogicalDatastoreType.OPERATIONAL, iid_l3);
}
}
+ // TODO: Implement L3Prefix
ListenableFuture<Void> r = t.submit();
return Futures.transform(r, futureTrans, executor);
}
@Override
- public Future<RpcResult<Void>>
- setEndpointGroupConditions(SetEndpointGroupConditionsInput input) {
+ public Future<RpcResult<Void>>
+ setEndpointGroupConditions(SetEndpointGroupConditionsInput input) {
WriteTransaction t = dataProvider.newWriteOnlyTransaction();
- ConditionMappingKey key =
+ ConditionMappingKey key =
new ConditionMappingKey(input.getEndpointGroup());
-
- for (EndpointGroupCondition condition: input.getEndpointGroupCondition()) {
- EndpointGroupConditionKey ckey =
+
+ for (EndpointGroupCondition condition : input.getEndpointGroupCondition()) {
+ EndpointGroupConditionKey ckey =
new EndpointGroupConditionKey(condition.getCondition());
- InstanceIdentifier<EndpointGroupCondition> iid =
+ InstanceIdentifier<EndpointGroupCondition> iid =
InstanceIdentifier.builder(Endpoints.class)
- .child(ConditionMapping.class, key)
- .child(EndpointGroupCondition.class, ckey)
- .build();
+ .child(ConditionMapping.class, key)
+ .child(EndpointGroupCondition.class, ckey)
+ .build();
t.put(LogicalDatastoreType.OPERATIONAL, iid, condition);
}
}
@Override
- public Future<RpcResult<Void>>
- unsetEndpointGroupConditions(UnsetEndpointGroupConditionsInput input) {
+ public Future<RpcResult<Void>>
+ unsetEndpointGroupConditions(UnsetEndpointGroupConditionsInput input) {
WriteTransaction t = dataProvider.newWriteOnlyTransaction();
- ConditionMappingKey key =
+ ConditionMappingKey key =
new ConditionMappingKey(input.getEndpointGroup());
-
- for (EndpointGroupCondition condition: input.getEndpointGroupCondition()) {
- EndpointGroupConditionKey ckey =
+
+ for (EndpointGroupCondition condition : input.getEndpointGroupCondition()) {
+ EndpointGroupConditionKey ckey =
new EndpointGroupConditionKey(condition.getCondition());
- InstanceIdentifier<EndpointGroupCondition> iid =
+ InstanceIdentifier<EndpointGroupCondition> iid =
InstanceIdentifier.builder(Endpoints.class)
- .child(ConditionMapping.class, key)
- .child(EndpointGroupCondition.class, ckey)
- .build();
+ .child(ConditionMapping.class, key)
+ .child(EndpointGroupCondition.class, ckey)
+ .build();
t.delete(LogicalDatastoreType.OPERATIONAL, iid);
}
}
Function<Void, RpcResult<Void>> futureTrans =
- new Function<Void,RpcResult<Void>>() {
- @Override
- public RpcResult<Void> apply(Void input) {
- return RpcResultBuilder.<Void>success().build();
- }
- };
+ new Function<Void, RpcResult<Void>>() {
+ @Override
+ public RpcResult<Void> apply(Void input) {
+ return RpcResultBuilder.<Void> success().build();
+ }
+ };
+
}