48
48
import com .hortonworks .registries .schemaregistry .errors .SchemaBranchAlreadyExistsException ;
49
49
import com .hortonworks .registries .schemaregistry .errors .SchemaBranchNotFoundException ;
50
50
import com .hortonworks .registries .schemaregistry .errors .SchemaNotFoundException ;
51
+ import com .hortonworks .registries .schemaregistry .exceptions .RegistryRetryableException ;
51
52
import com .hortonworks .registries .schemaregistry .serde .SerDesException ;
52
53
import com .hortonworks .registries .schemaregistry .serde .SnapshotDeserializer ;
53
54
import com .hortonworks .registries .schemaregistry .serde .SnapshotSerializer ;
106
107
import java .util .concurrent .ConcurrentHashMap ;
107
108
import java .util .concurrent .ExecutionException ;
108
109
import java .util .concurrent .TimeUnit ;
110
+ import java .util .concurrent .locks .Lock ;
111
+ import java .util .concurrent .locks .ReadWriteLock ;
112
+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
109
113
110
114
import static com .hortonworks .registries .schemaregistry .client .SchemaRegistryClient .Configuration .DEFAULT_CONNECTION_TIMEOUT ;
111
115
import static com .hortonworks .registries .schemaregistry .client .SchemaRegistryClient .Configuration .DEFAULT_READ_TIMEOUT ;
@@ -157,11 +161,15 @@ public class SchemaRegistryClient implements ISchemaRegistryClient {
157
161
private static final Set <Class <?>> SERIALIZER_INTERFACE_CLASSES = Sets .<Class <?>>newHashSet (SnapshotSerializer .class , PullSerializer .class );
158
162
private static final String SEARCH_FIELDS = SCHEMA_REGISTRY_PATH + "/search/schemas/fields" ;
159
163
private static Subject subject ;
164
+ private static ReadWriteLock kerberosSynchronizationLock = null ;
165
+ private static final long KERBEROS_SYNCHRONIZATION_TIMEOUT_MS = 180000 ;
160
166
161
167
static {
162
168
String jaasConfigFile = System .getProperty ("java.security.auth.login.config" );
163
169
if (jaasConfigFile != null && !jaasConfigFile .trim ().isEmpty ()) {
164
- KerberosLogin kerberosLogin = new KerberosLogin ();
170
+ kerberosSynchronizationLock = new ReentrantReadWriteLock (true );
171
+ Lock kerberosTGTRenewalLock = kerberosSynchronizationLock .writeLock ();
172
+ KerberosLogin kerberosLogin = new KerberosLogin (kerberosTGTRenewalLock , KERBEROS_SYNCHRONIZATION_TIMEOUT_MS );
165
173
kerberosLogin .configure (new HashMap <>(), REGISTY_CLIENT_JAAS_SECTION );
166
174
try {
167
175
subject = kerberosLogin .login ().getSubject ();
@@ -447,7 +455,7 @@ public void deleteSchema(String schemaName) throws SchemaNotFoundException {
447
455
}
448
456
449
457
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path (String .format ("%s" , schemaName ));
450
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
458
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
451
459
@ Override
452
460
public Response run () {
453
461
return target .request (MediaType .APPLICATION_JSON_TYPE ).delete (Response .class );
@@ -516,7 +524,7 @@ public SchemaIdVersion uploadSchemaVersion(final String schemaBranchName,
516
524
.bodyPart (streamDataBodyPart );
517
525
518
526
Entity <MultiPart > multiPartEntity = Entity .entity (multipartEntity , MediaType .MULTIPART_FORM_DATA );
519
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
527
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
520
528
@ Override
521
529
public Response run () {
522
530
return target .request ().post (multiPartEntity , Response .class );
@@ -575,7 +583,7 @@ public void deleteSchemaVersion(SchemaVersionKey schemaVersionKey) throws Schema
575
583
576
584
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path (String .format ("%s/versions/%s" , schemaVersionKey
577
585
.getSchemaName (), schemaVersionKey .getVersion ()));
578
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
586
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
579
587
@ Override
580
588
public Response run () {
581
589
return target .request (MediaType .APPLICATION_JSON_TYPE ).delete (Response .class );
@@ -606,7 +614,7 @@ private SchemaIdVersion doAddSchemaVersion(String schemaBranchName, String schem
606
614
607
615
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path (schemaName ).path ("/versions" ).queryParam ("branch" , schemaBranchName )
608
616
.queryParam ("disableCanonicalCheck" , disableCanonicalCheck );
609
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
617
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
610
618
@ Override
611
619
public Response run () {
612
620
return target .request (MediaType .APPLICATION_JSON_TYPE ).post (Entity .json (schemaVersion ), Response .class );
@@ -759,7 +767,7 @@ public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFound
759
767
@ Override
760
768
public SchemaVersionMergeResult mergeSchemaVersion (Long schemaVersionId , boolean disableCanonicalCheck ) throws SchemaNotFoundException , IncompatibleSchemaException {
761
769
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path (schemaVersionId + "/merge" ).queryParam ("disableCanonicalCheck" , disableCanonicalCheck );
762
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
770
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
763
771
@ Override
764
772
public Response run () {
765
773
return target .request ().post (null );
@@ -795,7 +803,7 @@ public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMach
795
803
@ Override
796
804
public SchemaBranch createSchemaBranch (Long schemaVersionId , SchemaBranch schemaBranch ) throws SchemaBranchAlreadyExistsException , SchemaNotFoundException {
797
805
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path ("versionsById/" + schemaVersionId + "/branch" );
798
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
806
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
799
807
@ Override
800
808
public Response run () {
801
809
return target .request (MediaType .APPLICATION_JSON_TYPE ).post (Entity .json (schemaBranch ), Response .class );
@@ -819,7 +827,7 @@ public Response run() {
819
827
@ Override
820
828
public Collection <SchemaBranch > getSchemaBranches (String schemaName ) throws SchemaNotFoundException {
821
829
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path (encode (schemaName ) + "/branches" );
822
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
830
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
823
831
@ Override
824
832
public Response run () {
825
833
return target .request ().get ();
@@ -839,7 +847,7 @@ public Response run() {
839
847
@ Override
840
848
public void deleteSchemaBranch (Long schemaBranchId ) throws SchemaBranchNotFoundException , InvalidSchemaBranchDeletionException {
841
849
WebTarget target = currentSchemaRegistryTargets ().schemasTarget .path ("branch/" + schemaBranchId );
842
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
850
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
843
851
@ Override
844
852
public Response run () {
845
853
return target .request ().delete ();
@@ -868,7 +876,7 @@ private boolean transitionSchemaVersionState(Long schemaVersionId,
868
876
byte [] transitionDetails ) throws SchemaNotFoundException , SchemaLifecycleException {
869
877
870
878
WebTarget webTarget = currentSchemaRegistryTargets ().schemaVersionsTarget .path (schemaVersionId + "/state/" + operationOrTargetState );
871
- Response response = Subject . doAs ( subject , new PrivilegedAction <Response >() {
879
+ Response response = synchronizeDoAction ( new PrivilegedAction <Response >() {
872
880
@ Override
873
881
public Response run () {
874
882
return webTarget .request ().post (Entity .text (transitionDetails ));
@@ -919,7 +927,7 @@ public CompatibilityResult checkCompatibility(String schemaName, String toSchema
919
927
public CompatibilityResult checkCompatibility (String schemaBranchName , String schemaName ,
920
928
String toSchemaText ) throws SchemaNotFoundException {
921
929
WebTarget webTarget = currentSchemaRegistryTargets ().schemasTarget .path (encode (schemaName ) + "/compatibility" ).queryParam ("branch" , schemaBranchName );
922
- String response = Subject . doAs ( subject , new PrivilegedAction <String >() {
930
+ String response = synchronizeDoAction ( new PrivilegedAction <String >() {
923
931
@ Override
924
932
public String run () {
925
933
return webTarget .request ().post (Entity .text (toSchemaText ), String .class );
@@ -953,7 +961,7 @@ public String uploadFile(InputStream inputStream) {
953
961
MultiPart multiPart = new MultiPart ();
954
962
BodyPart filePart = new StreamDataBodyPart ("file" , inputStream , "file" );
955
963
multiPart .bodyPart (filePart );
956
- return Subject . doAs ( subject , new PrivilegedAction <String >() {
964
+ return synchronizeDoAction ( new PrivilegedAction <String >() {
957
965
@ Override
958
966
public String run () {
959
967
return currentSchemaRegistryTargets ().filesTarget .request ()
@@ -964,7 +972,7 @@ public String run() {
964
972
965
973
@ Override
966
974
public InputStream downloadFile (String fileId ) {
967
- return Subject . doAs ( subject , new PrivilegedAction <InputStream >() {
975
+ return synchronizeDoAction ( new PrivilegedAction <InputStream >() {
968
976
@ Override
969
977
public InputStream run () {
970
978
return currentSchemaRegistryTargets ().filesTarget .path ("download/" + encode (fileId ))
@@ -1086,7 +1094,7 @@ private <T> T createInstance(SerDesInfo serDesInfo, boolean isSerializer) {
1086
1094
}
1087
1095
1088
1096
private <T > List <T > getEntities (WebTarget target , Class <T > clazz ) {
1089
- String response = Subject . doAs ( subject , new PrivilegedAction <String >() {
1097
+ String response = synchronizeDoAction ( new PrivilegedAction <String >() {
1090
1098
@ Override
1091
1099
public String run () {
1092
1100
return target .request (MediaType .APPLICATION_JSON_TYPE ).get (String .class );
@@ -1111,7 +1119,7 @@ private <T> List<T> parseResponseAsEntities(String response, Class<T> clazz) {
1111
1119
}
1112
1120
1113
1121
private <T > T postEntity (WebTarget target , Object json , Class <T > responseType ) {
1114
- String response = Subject . doAs ( subject , new PrivilegedAction <String >() {
1122
+ String response = synchronizeDoAction ( new PrivilegedAction <String >() {
1115
1123
@ Override
1116
1124
public String run () {
1117
1125
return target .request (MediaType .APPLICATION_JSON_TYPE ).post (Entity .json (json ), String .class );
@@ -1130,7 +1138,7 @@ private <T> T readEntity(String response, Class<T> clazz) {
1130
1138
}
1131
1139
1132
1140
private <T > T getEntity (WebTarget target , Class <T > clazz ) {
1133
- String response = Subject . doAs ( subject , new PrivilegedAction <String >() {
1141
+ String response = synchronizeDoAction ( new PrivilegedAction <String >() {
1134
1142
@ Override
1135
1143
public String run () {
1136
1144
return target .request (MediaType .APPLICATION_JSON_TYPE ).get (String .class );
@@ -1140,6 +1148,30 @@ public String run() {
1140
1148
return readEntity (response , clazz );
1141
1149
}
1142
1150
1151
+ private <T > T synchronizeDoAction (PrivilegedAction <T > action ) {
1152
+ if (kerberosSynchronizationLock == null ) {
1153
+ return Subject .doAs (subject , action );
1154
+ } else {
1155
+ Lock schemaRegistryLock = kerberosSynchronizationLock .readLock ();
1156
+ try {
1157
+ if (schemaRegistryLock .tryLock (KERBEROS_SYNCHRONIZATION_TIMEOUT_MS , TimeUnit .MILLISECONDS )) {
1158
+ T result ;
1159
+ try {
1160
+ result = Subject .doAs (subject , action );
1161
+ } finally {
1162
+ schemaRegistryLock .unlock ();
1163
+ }
1164
+ return result ;
1165
+ } else {
1166
+ throw new RegistryRetryableException ("Timed out while schema registry client was waiting for Kerberos TGT renewal" );
1167
+ }
1168
+ } catch (InterruptedException e ) {
1169
+ throw new RegistryRetryableException ("Error while schema registry client was waiting for Kerberos TGT renewal" , e );
1170
+ }
1171
+ }
1172
+ }
1173
+
1174
+
1143
1175
public static final class Configuration {
1144
1176
// we may want to remove schema.registry prefix from configuration properties as these are all properties
1145
1177
// given by client.
0 commit comments