Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

ISSUE-509: Provide a delete API for schema metadata #510

Merged
merged 1 commit into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Maps;
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.exceptions.RegistryException;
Expand All @@ -37,21 +41,31 @@ public class SchemaMetadataCache {
private static final Logger LOG = LoggerFactory.getLogger(SchemaMetadataCache.class);

private final LoadingCache<Key, SchemaMetadataInfo> loadingCache;
private final BiMap<String, Long> schemaNameToIdMap;

public SchemaMetadataCache(Long size, Long expiryInSecs, final SchemaMetadataFetcher schemaMetadataFetcher) {
schemaNameToIdMap = Maps.synchronizedBiMap(HashBiMap.create());
loadingCache = CacheBuilder.newBuilder()
.maximumSize(size)
.expireAfterAccess(expiryInSecs, TimeUnit.SECONDS)
.build(new CacheLoader<Key, SchemaMetadataInfo>() {
@Override
public SchemaMetadataInfo load(Key key) throws Exception {
SchemaMetadataInfo schemaMetadataInfo;
Key otherKey;
if (key.getName() != null) {
return schemaMetadataFetcher.fetch(key.getName());
schemaMetadataInfo = schemaMetadataFetcher.fetch(key.getName());
otherKey = Key.of(schemaMetadataInfo.getId());
schemaNameToIdMap.put(key.getName(), schemaMetadataInfo.getId());
} else if (key.getId() != null) {
return schemaMetadataFetcher.fetch(key.getId());
schemaMetadataInfo = schemaMetadataFetcher.fetch(key.getId());
otherKey = Key.of(schemaMetadataInfo.getSchemaMetadata().getName());
schemaNameToIdMap.put(schemaMetadataInfo.getSchemaMetadata().getName(), schemaMetadataInfo.getId());
} else {
throw new RegistryException("Key should have name or id as non null");
}
loadingCache.put(otherKey, schemaMetadataInfo);
return schemaMetadataInfo;
}
});
}
Expand Down Expand Up @@ -79,7 +93,19 @@ public SchemaMetadataInfo get(Key key) {

return schemaMetadataInfo;
}


public void invalidateSchemaMetadata (SchemaMetadataCache.Key key) {
LOG.info("Invalidating cache entry for key [{}]", key);

// If the cache doesn't have entry for the key, then no need to invalidate the cache
if(loadingCache.getIfPresent(key) != null)
loadingCache.invalidate(key);

Key otherKey = key.id == null ? Key.of(schemaNameToIdMap.get(key.name)) : Key.of(schemaNameToIdMap.inverse().get(key.id));
if(loadingCache.getIfPresent(otherKey) != null)
loadingCache.invalidate(otherKey);
}

public void put(Key key, SchemaMetadataInfo schemaMetadataInfo) {
loadingCache.put(key, schemaMetadataInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ public SchemaRegistryClient(File confFile) throws IOException {

public SchemaRegistryClient(Map<String, ?> conf) {
configuration = new Configuration(conf);

ClientConfig config = createClientConfig(conf);
ClientBuilder clientBuilder = JerseyClientBuilder.newBuilder()
.withConfig(config)
Expand Down Expand Up @@ -425,6 +424,33 @@ public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) {
return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaMetadataId));
}

@Override
public void deleteSchema(String schemaName) throws SchemaNotFoundException {
Collection<SchemaVersionInfo> schemaVersionInfos = getAllVersions(schemaName);
schemaMetadataCache.invalidateSchemaMetadata(SchemaMetadataCache.Key.of(schemaName));
if (schemaVersionInfos != null) {
for (SchemaVersionInfo schemaVersionInfo: schemaVersionInfos) {
SchemaIdVersion schemaIdVersion = new SchemaIdVersion(schemaVersionInfo.getId());
schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
}
}

WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s", schemaName));
Response response = Subject.doAs(subject, new PrivilegedAction<Response>() {
@Override
public Response run() {
return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
}
});

int status = response.getStatus();
if (status == Response.Status.NOT_FOUND.getStatusCode()) {
throw new SchemaNotFoundException(response.readEntity(String.class));
} else if (status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
throw new RuntimeException(response.readEntity(String.class));
}
}

@Override
public SchemaIdVersion addSchemaVersion(SchemaMetadata schemaMetadata, SchemaVersion schemaVersion) throws
InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public interface ISchemaRegistryService {
*/
SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId);

/**
* Deletes a schema metadata and all related data given {@code schemaName}, throws an SchemaNotFoundException if schema is absent.
*
* @param schemaName schema metadata name
*
* @throws SchemaNotFoundException when there is no schema with the given {@code schemaName}
*/
void deleteSchema(String schemaName) throws SchemaNotFoundException;

/**
* Returns version of the schema added with the given schemaInfo.
* <pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,38 @@ public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) {
return schemaMetadataInfo;
}

@Override
public void deleteSchema(String schemaName) throws SchemaNotFoundException {
Collection<SchemaVersionInfo> schemaVersionInfos = getAllVersions(schemaName);
Long schemaMetadataId = null;
// Remove all the schema version state entities for this schema name, invalidate relevant caches and notify all HA servers
if (schemaVersionInfos != null) {
for (SchemaVersionInfo schemaVersionInfo: schemaVersionInfos) {
invalidateCachesAndNotifyAllHAServers(schemaVersionInfo);
schemaMetadataId = schemaVersionInfo.getSchemaMetadataId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, instead of invalidating one schema version at a time. I think we should expose a separate endpoint to invalidate schema metadata as a whole. It possible that during the process of invalidation there could be network outage, we might end up just invalidating a subset of versions leaving the schema metadata in a partial delete state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raju-saravanan Sorry, forgot to reply to your comment. That's a good point and it's cleaner. Can we do it as part of a separate PR and merge this in if everything looks good? Reason is that we are anyways retrying notifying all HA servers at https://github.com/hortonworks/registry/blob/master/schema-registry/core/src/main/java/com/hortonworks/registries/schemaregistry/HAServerNotificationManager.java#L60 and even putting the current thread to sleep for 100 ms every time. So the delete call just won't return. And a network outage and out of sync could happen even if we provide a notifyHAServer at schema metadata level as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@priyank5485: Agreed, but by making a single call to the each and every HA server, the call could either fail or succeed, depending upon the network connectivity, in other words, it has a transaction like property. The cache would reflect the old snapshot of database if network call didn't go through, or latest snapshot of the database if the network call went through but it would never reflect an intermediate state. This is approach would be a minor improvement, so we can take it up in a different PR.

List<QueryParam> queryParams = new ArrayList<>();
queryParams.add(new QueryParam(SchemaVersionStateStorable.SCHEMA_VERSION_ID, schemaVersionInfo.getId().toString()));
Collection<SchemaVersionStateStorable> schemaVersionStateStorables = storageManager.find(SchemaVersionStateStorable.NAME_SPACE, queryParams);
if (schemaVersionStateStorables != null) {
for (SchemaVersionStateStorable schemaVersionStateStorable : schemaVersionStateStorables) {
storageManager.remove(schemaVersionStateStorable.getStorableKey());
}
}
}
}
// Remove all serdes mappings for this schema name
Collection<SchemaSerDesMapping> schemaSerDesMappings = getSchemaSerDesMappings(schemaMetadataId);
if (schemaSerDesMappings != null) {
for (SchemaSerDesMapping schemaSerDesMapping: schemaSerDesMappings) {
storageManager.remove(schemaSerDesMapping.getStorableKey());
}
}
// Finally remove the schema metadata entry that will remove other related entries on cascade at DB level
SchemaMetadataStorable schemaMetadataStorable = new SchemaMetadataStorable();
schemaMetadataStorable.setName(schemaName);
storageManager.remove(schemaMetadataStorable.getStorableKey());
}

@Override
public SchemaMetadataInfo getSchemaMetadataInfo(String schemaName) {
SchemaMetadataStorable givenSchemaMetadataStorable = new SchemaMetadataStorable();
Expand Down Expand Up @@ -943,4 +975,16 @@ private void invalidateSchemaBranchInAllHAServers(SchemaBranchCache.Key key) {

haServerNotificationManager.notifyCacheInvalidation(schemaBranchCache.getCacheType(),keyAsString);
}

// Clear the relevant caches for this schema version and notify HA servers
private void invalidateCachesAndNotifyAllHAServers(SchemaVersionInfo schemaVersionInfo) {
Collection<SchemaBranch> schemaBranches = schemaVersionLifecycleManager.getSchemaBranches(schemaVersionInfo.getId());
if (schemaBranches != null) {
for (SchemaBranch schemaBranch: schemaBranches) {
invalidateSchemaBranchInAllHAServers(SchemaBranchCache.Key.of(schemaBranch.getId()));
}
}
SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaVersionInfo.getName(), schemaVersionInfo.getVersion());
schemaVersionLifecycleManager.invalidateSchemaInAllHAServer(SchemaVersionInfoCache.Key.of(schemaVersionKey));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ public void invalidateSchemaVersionCache(SchemaVersionInfoCache.Key key) {
schemaVersionInfoCache.invalidateSchema(key);
}

private void invalidateSchemaInAllHAServer(SchemaVersionInfoCache.Key key) {
public void invalidateSchemaInAllHAServer(SchemaVersionInfoCache.Key key) {
schemaVersionInfoCache.invalidateSchema(key);

String keyAsString;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,24 @@ public Response getSchemaInfo(@ApiParam(value = "Schema identifier", required =
return response;
}

@DELETE
@Path("/schemas/{name}")
@ApiOperation(value = "Delete a schema metadata and all related data", tags = OPERATION_GROUP_SCHEMA)
@UnitOfWork
public Response deleteSchemaMetadata(@ApiParam(value = "Schema name", required = true) @PathParam("name") String schemaName,
@Context UriInfo uriInfo) {
try {
schemaRegistry.deleteSchema(schemaName);
return WSUtils.respond(Response.Status.OK);
} catch (SchemaNotFoundException e) {
LOG.error("No schema metadata found with name: [{}]", schemaName);
return WSUtils.respond(Response.Status.NOT_FOUND, CatalogResponse.ResponseMessage.ENTITY_NOT_FOUND, schemaName);
} catch (Exception ex) {
LOG.error("Encountered error while deleting schema with name: [{}]", schemaName, ex);
return WSUtils.respond(Response.Status.INTERNAL_SERVER_ERROR, CatalogResponse.ResponseMessage.EXCEPTION, ex.getMessage());
}
}

@POST
@Path("/schemas/{name}/versions/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) {
return schemaRegistry.getSchemaMetadataInfo(schemaMetadataId);
}

@Override
public void deleteSchema(String schemaName) throws SchemaNotFoundException {
schemaRegistry.deleteSchema(schemaName);
}

@Override
public SchemaIdVersion addSchemaVersion(SchemaMetadata schemaMetadata, SchemaVersion schemaVersion)
throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
Expand Down