Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Initial push #2

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -10,3 +10,11 @@

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*


.gradle/
build/
!gradle/wrapper/gradle-wrapper.jar

*.iml
.idea/
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
language: java
jdk:
- oraclejdk8
script:
- "./gradlew build"
169 changes: 169 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
group 'io.datanerds'
version '1.0-SNAPSHOT'

def grpcVersion = '1.1.2'
def consulClientVersion = '0.11.3'
def slf4jVersion = '1.7.21'

buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "com.google.protobuf:protobuf-gradle-plugin:0.8.1"
}
}

subprojects {
apply plugin: 'java'
apply plugin: 'com.google.protobuf'

repositories {
mavenCentral()
mavenLocal()
}

dependencies {
compile "io.grpc:grpc-all:${grpcVersion}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I read at a glance here, you need either grpc-all or the other deps?

compile "io.grpc:grpc-core:${grpcVersion}"
compile "io.grpc:grpc-netty:${grpcVersion}"
compile "io.grpc:grpc-protobuf:${grpcVersion}"
compile "io.grpc:grpc-stub:${grpcVersion}"
compile "com.orbitz.consul:consul-client:${consulClientVersion}"
compile "org.slf4j:slf4j-api:${slf4jVersion}"

testCompile "junit:junit:4.12"
testCompile 'org.hamcrest:hamcrest-library:1.3'
testCompile 'org.awaitility:awaitility:2.0.0'
testCompile 'com.pszymczyk.consul:embedded-consul:0.2.3'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! 👍

}

protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.2.0'
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generateProtoTasks {
all()*.plugins {
grpc {
option 'enable_deprecated=false'
}
}
}
}

sourceSets {
main {
proto {
srcDirs += file("${projectDir}/build/generated/source/proto/main/java");
srcDirs += file("${projectDir}/build/generated/source/proto/main/grpc");
srcDirs += file("${projectDir}/build/generated/source/proto/test/java");
srcDirs += file("${projectDir}/build/generated/source/proto/test/grpc");
}
}
}
}

project(":consulnameresolver") {
task javadocJar(type: Jar) {
classifier = 'javadoc'
from javadoc
}

task sourcesJar(type: Jar) {
classifier = 'sources'
from sourceSets.main.allSource
}

artifacts {
archives javadocJar, sourcesJar
}
}

buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.5.3"
}
}


apply plugin: 'io.codearte.nexus-staging'

if (project.hasProperty('staging')) {

nexusStaging {
username = ossrhUser
password = ossrhPassword
}

project(":consulnameresolver") {
apply plugin: 'signing'
apply plugin: 'maven'
apply plugin: 'osgi'

jar {
manifest {
version = project.version
symbolicName = "$project.group.$baseName"
}
}

signing {
sign configurations.archives
}

nexusStaging {
packageGroup = "io.datanerds"
stagingProfileId = "36a7a00c49b56"
}

uploadArchives {
repositories {
mavenDeployer {
beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") {
authentication(userName: ossrhUser, password: ossrhPassword)
}
pom.project {
name project.name
groupId 'io.datanerds'
description project.description
packaging 'jar'
url 'https://github.com/datanerds-io/ConsulNameResolver'

scm {
connection 'scm:git:https://github.com/datanerds-io/ConsulNameResolver.git'
developerConnection 'scm:git:git@github.com:datanerds-io/ConsulNameResolver.git'
url 'https://github.com/datanerds-io/ConsulNameResolver.git'
}

licenses {
license {
name 'Eclipse Public License 1.0'
url 'https://opensource.org/licenses/EPL-1.0'
distribution 'repo'
}
}

developers {
developer {
id = 'utobi'
name = 'Tobias Ullrich'
email = 'github-2017@ullrich.io'
}
}
}
}
}
}

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.datanerds.grpc;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import com.orbitz.consul.Consul;
import com.orbitz.consul.HealthClient;
import com.orbitz.consul.cache.ServiceHealthCache;
import com.orbitz.consul.cache.ServiceHealthKey;
import com.orbitz.consul.model.ConsulResponse;
import com.orbitz.consul.model.health.ServiceHealth;
import com.orbitz.consul.option.ImmutableCatalogOptions;
import io.grpc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class ConsulNameResolver extends NameResolver {
public static final String DEFAULT_ADDRESS = "localhost";
public static final Integer DEFAULT_PORT = 8383;
private static final Logger logger = LoggerFactory.getLogger(ConsulNameResolver.class);
private final String authority;
private final String service;
private Listener listener;
private Consul consul;
private HealthClient healthClient;
private Optional<String> datacenter;
private Optional<List<String>> tags;

public ConsulNameResolver(ConsulQueryParameter parameter) {
this.authority = parameter.consulAddress.orElse(
String.format("%s:%s", DEFAULT_ADDRESS, DEFAULT_PORT)); //todo move to URI
this.service = parameter.service;
this.datacenter = parameter.datacenter;
this.tags = parameter.tags;

HostAndPort consulHostAndPort = HostAndPort.fromString(this.authority);
this.consul = Consul.builder().withHostAndPort(consulHostAndPort).build();
this.healthClient = this.consul.healthClient();
}

@Override
public String getServiceAuthority() {
return this.authority;
}

@Override
public void shutdown() {
}

@Override
public void start(Listener listener) {
Preconditions.checkState(this.listener == null, "already started");
this.listener = Preconditions.checkNotNull(listener, "listener");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error message "listener"? Should probably be more informative: "listener cannot be empty".

logger.debug("Resolving service '{}' using consul at '%{}'", this.service, this.authority);

ServiceHealthCache svHealth =
ServiceHealthCache.newCache(this.healthClient, this.service, true, buildCatalogOptions(), 5);

svHealth.addListener((Map<ServiceHealthKey, ServiceHealth> newValues) -> {
if (newValues.isEmpty()) {
updateListenerWithEmptyResult();
return;
}
listener.onUpdate(generateResolvedServerList(newValues), Attributes.EMPTY);
});

try {
svHealth.start();
runInitialResolve();
} catch (Exception ex) {
throw new RuntimeException("Exception while trying to start consul client for name resolution", ex);
}
}

private ImmutableCatalogOptions buildCatalogOptions() {
ImmutableCatalogOptions.Builder options = ImmutableCatalogOptions.builder();

if (this.datacenter.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For conciseness I'd use ifPresent here like so

this.datacenter.ifPresent(d -> options.datacenter(com.google.common.base.Optional.of(d)));

but that's just matter of taste though.

options.datacenter(com.google.common.base.Optional.of(this.datacenter.get()));
}

if (this.tags.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.tags.ifPresent(tags -> tags.forEach(options::tag));

this.tags.get().forEach(options::tag);
}
return options.build();
}

private void updateListenerWithEmptyResult() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably rename method to something like propagateServiceUnavailableToListeners?

logger.warn("No servers could be resolved for '{}'", ConsulNameResolver.this.service);
this.listener.onError(Status.UNAVAILABLE.augmentDescription(String.format(
"No servers could be resolved for service '%s' from authority '%s'",
ConsulNameResolver.this.service, ConsulNameResolver.this.authority)));
this.listener.onUpdate(new ArrayList<>(), Attributes.EMPTY);
}

private List<ResolvedServerInfoGroup> generateResolvedServerList(Map<ServiceHealthKey, ServiceHealth> newValues) {
return newValues
.keySet()
.stream()
.map(key -> new InetSocketAddress(key.getHost(), key.getPort()))
.map(socketAddress -> ResolvedServerInfoGroup.builder()
.add(new ResolvedServerInfo(socketAddress, Attributes.EMPTY)))
.map(ResolvedServerInfoGroup.Builder::build).collect(Collectors.toList());
}

private void runInitialResolve() {
ConsulResponse<List<ServiceHealth>> healthyServiceInstances =
this.healthClient.getHealthyServiceInstances(this.service, buildCatalogOptions());
if (healthyServiceInstances.getResponse().isEmpty()) {
this.listener.onError(Status.UNAVAILABLE.augmentDescription(String.format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate code (see L97)

"No servers could be resolved for service '%s' from authority '%s'",
this.service, this.authority)));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.datanerds.grpc;

import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.List;
import java.util.Map;

public class ConsulNameResolverProvider extends NameResolverProvider {
private static final Logger logger = LoggerFactory.getLogger(ConsulNameResolverProvider.class);

private static final String SCHEME = "consul";
private static final String QUERY_PARAMETER_TAGS = "tag";
private static final String QUERY_PARAMETER_DATACENTER = "dc";

@Override
protected boolean isAvailable() {
return true;
}

@Override
protected int priority() {
return 5;
}

@Override
public String getDefaultScheme() {
return SCHEME;
}

@Override
public NameResolver newNameResolver(URI targetUri, Attributes params) {
//Contract is to return null if resolver is not able to handle uri
try {
return new ConsulNameResolver(validateInputURI(targetUri));
} catch (IllegalArgumentException | NullPointerException ex) {
return null;
}
}

private void validateScheme(URI uri) {
if (!SCHEME.equalsIgnoreCase(uri.getScheme())) {
throw new IllegalArgumentException("scheme for uri is not 'consul'");
}
}

protected ConsulQueryParameter validateInputURI(final URI uri) {
validateScheme(uri);
Preconditions.checkNotNull(uri.getHost(), "Host cannot be empty");
Preconditions.checkNotNull(uri.getPath(), "Path cannot be empty");
Preconditions.checkArgument(uri.getPath().startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", uri.getPath(), uri);

Map<String, List<String>> splitQuery = URIUtils.splitQuery(uri);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused!

Map<String, List<String>> queryParameters = URIUtils.splitQuery(uri);
Preconditions.checkArgument(!(queryParameters.containsKey(QUERY_PARAMETER_DATACENTER)
&& queryParameters.get(QUERY_PARAMETER_DATACENTER).size() != 1),
"Only one datacenter can be defined");

String consul;
if (uri.getPort() == -1) {
consul = String.format("%s:%s", uri.getHost(), ConsulNameResolver.DEFAULT_PORT);
} else {
consul = String.format("%s:%s", uri.getHost(), uri.getPort());
}

String service = uri.getPath().substring(1);

ConsulQueryParameter.Builder builder = new ConsulQueryParameter.Builder(service)
.withConsulAddress(consul);

if (queryParameters.containsKey(QUERY_PARAMETER_DATACENTER)) {
builder.withDatacenter(queryParameters.get(QUERY_PARAMETER_DATACENTER).get(0));
}
if (queryParameters.containsKey(QUERY_PARAMETER_TAGS)) {
builder.withTags(queryParameters.get(QUERY_PARAMETER_TAGS));
}

return builder.build();
}

}
Loading