-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGaea.py
5735 lines (4979 loc) · 271 KB
/
Gaea.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
Created on Tue Sep 11 13:37:40 2018
@author: rjovelin
"""
import json
import subprocess
import time
import pymysql
import os
import argparse
import requests
import uuid
import xml.etree.ElementTree as ET
import gzip
def extract_credentials(credential_file):
'''
(str) -> dict
Returns a dictionary with the database and EGA boxes credentials
Parameters
----------
- credential_file (str): Path to the file with the database and EGA box credentials
'''
D = {}
infile = open(credential_file)
for line in infile:
if line.rstrip() != '':
line = line.rstrip().split('=')
D[line[0].strip()] = line[1].strip()
infile.close()
return D
def connect_to_database(credential_file, database):
'''
(str, str) -> pymysql.connections.Connection
Open a connection to the EGA database by parsing the CredentialFile
Parameters
----------
- credential_file (str): Path to the file with the database and EGA box credentials
- database (str): Name of the database
'''
# get the database credentials
credentials = extract_credentials(credential_file)
DbHost = credentials['DbHost']
DbUser, DbPasswd = credentials['DbUser'], credentials['DbPasswd']
try:
conn = pymysql.connect(host = DbHost, user = DbUser, password = DbPasswd,
db = database, charset = "utf8", port=3306)
except:
try:
conn = pymysql.connect(host=DbHost, user=DbUser, password=DbPasswd, db=database)
except:
raise ValueError('cannot connect to {0} database'.format(database))
return conn
def show_tables(credential_file, database):
'''
(str) -> list
Returns a list of tables in the EGA database
Parameters
----------
- credential_file (str): Path to the file with the database and EGA box credentials
'''
# connect to EGA database
conn = connect_to_database(credential_file, database)
# make a list of database tables
cur = conn.cursor()
cur.execute('SHOW TABLES')
tables = [i[0] for i in cur]
conn.close()
return tables
def get_working_directory(S, working_dir):
'''
(str, str) -> str
Returns a subdirectory in where encrypted and md5sum files are written
by appending S to working_dir
Parameters
----------
- S (str): Subdirectory in working_dir in which md5sums and encrypted files are written
- working_dir (str): Directory where sub-directories used for submissions are written
'''
return os.path.join(working_dir, S)
def add_working_directory(credential_file, database, table, box, working_dir):
'''
(str, str, str, str, str) --> None
Create unique directories in file system for each alias in table and given Box
and record working directory in database table
Parameters
----------
- credential_file (str): Path to the file with the database and EGA box credentials
- database (str): Name of the database
- table (str): Table name in database
- box (str): EGA box
- working_dir (str): Directory where sub-directories used for submissions are written
'''
# check if table exists
tables = show_tables(credential_file, database)
if table in tables:
# connect to db
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
# get the alias with valid status
cur.execute('SELECT {0}.alias FROM {0} WHERE {0}.Status=\"valid\" and {0}.egaBox=\"{1}\"'.format(table, box))
data = cur.fetchall()
if len(data) != 0:
# loop over alias
for i in data:
alias = i[0]
# create working directory with random unique identifier
UID = str(uuid.uuid4())
# record identifier in table, create working directory in file system
cur.execute('UPDATE {0} SET {0}.WorkingDirectory=\"{1}\" WHERE {0}.alias=\"{2}\" AND {0}.egaBox=\"{3}\"'.format(table, UID, alias, box))
conn.commit()
# create working directories
working_directory = get_working_directory(UID, working_dir)
os.makedirs(working_directory)
conn.close()
# check that working directory was recorded and created
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
# get the alias and working directory with valid status
cur.execute('SELECT {0}.alias, {0}.WorkingDirectory FROM {0} WHERE {0}.Status=\"valid\" and {0}.egaBox=\"{1}\"'.format(table, box))
data = cur.fetchall()
if len(data) != 0:
for i in data:
error = []
alias = i[0]
working_directory = get_working_directory(i[1], working_dir)
if i[1] in ['', 'NULL', '(null)']:
error.append('Working directory does not have a valid Id')
if os.path.isdir(working_directory) == False:
error.append('Working directory not generated')
# check if error message
if len(error) != 0:
# error is found, record error message, keep status valid --> valid
cur.execute('UPDATE {0} SET {0}.errorMessages=\"{1}\" WHERE {0}.alias=\"{2}\" AND {0}.egaBox=\"{3}\"'.format(table, ';'.join(error), alias, box))
conn.commit()
else:
# no error, update Status valid --> start
cur.execute('UPDATE {0} SET {0}.Status=\"encrypt\", {0}.errorMessages=\"None\" WHERE {0}.alias=\"{1}\" AND {0}.egaBox=\"{2}\"'.format(table, alias, box))
conn.commit()
conn.close()
def format_data(L):
'''
(list) -> tuple
Returns a tuple with data to be inserted in a database table
Parameters
----------
- L (list): List of data to be inserted into database table
'''
# create a tuple of strings data values
Values = []
# loop over data
for i in range(len(L)):
if L[i] == '' or L[i] == None or L[i] == 'NA':
Values.append('NULL')
else:
Values.append(str(L[i]))
return tuple(Values)
def list_enumerations(URL='https://ega-archive.org/submission-api/v1/'):
'''
(str) -> dict
Returns a dictionary with EGA enumerations as key and dictionary of metadata as value
Precondition: the list of enumerations available from EGA is hard-coded
Parameters
----------
- URL (str): URL of the API. Default is 'https://ega-archive.org/submission-api/v1/'
'''
# build the URL
URL = format_url(URL)
URL = URL + 'enums/'
# list all enumerations available from EGA
L = ['analysis_file_types', 'analysis_types', 'case_control', 'dataset_types', 'experiment_types',
'file_types', 'genders', 'instrument_models', 'library_selections', 'library_sources',
'library_strategies', 'reference_chromosomes', 'reference_genomes', 'study_types']
URLs = [os.path.join(URL, i) for i in L]
# create a dictionary to store each enumeration
enums = {}
for URL in URLs:
# create a dict to store the enumeration data (value: tag}
d = {}
# retrieve the information for the given enumeration
response = requests.get(URL)
# check response code
if response.status_code == requests.codes.ok:
# loop over dict in list
for i in response.json()['response']['result']:
if 'instrument_models' in URL:
if i['value'] == 'unspecified':
# grab label instead of value
assert i['label'] not in d
d[i['label']] = i['tag']
else:
assert i['value'] not in d
d[i['value']] = i['tag']
elif 'reference_chromosomes' in URL:
# grab value : tag
# group corresponds to tag in reference_genomes. currently, does not suppot patches
# group = 15 --> tag = 15 in reference_genomes = GRCH37
# group = 1 --> tag = 1 in reference_genomes = GRCH38
if i['group'] in ['1', '15']:
assert i['value'] not in d
d[i['value']] = i['tag']
else:
assert i['value'] not in d
d[i['value']] = i['tag']
enums[os.path.basename(URL).title().replace('_', '')] = d
return enums
def record_message(credential_file, database, table, box, alias, message, status):
'''
(str, str, str, str, str, str, str) -> None
Update the error message or the submission status for a given alias and box in
database table if status is respectively "Error" or "Status"
Parameters
----------
- credential_file (str): Path to the file with the database and EGA box credentials
- database (str): Name of the database
- table (str): Table name in database
- box (str): EGA box
- alias (str): Unique alias (primary key) of EGA object in table
- message (str): Error message or submission status
- status (str): Submission status of the EGA object
'''
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
if status == 'Error':
# record error message
cur.execute('UPDATE {0} SET {0}.errorMessages=\"{1}\" WHERE {0}.alias=\"{2}\" and {0}.egaBox=\"{3}\"'.format(table, message, alias, box))
elif status == 'Status':
# record submission status
cur.execute('UPDATE {0} SET {0}.submissionStatus=\"{1}\" WHERE {0}.alias="\{2}\" AND {0}.egaBox=\"{3}\"'.format(table, message, alias, box))
conn.commit()
conn.close()
def delete_validated_objects_with_errors(credential_file, database, table, box, ega_object, URL, submission_status):
'''
(str, str, str, str, str, str) - > None
Deletes the corresponding ega_object with submission_status being VALIDATED_WITH_ERRORS,
VALIDATED or DRAFT from the EGA API.
This step is requires prior submitting metadata for the ega_object if a previous attempt
didn't complete or the new submission will result in error because the object already exists
Parameters
----------
- credential_file (str): File with ega-box and database credentials
- database (str): Name of the submission database
- table (str): Table in database
- box (str): EGA submission box (ega-box-xxxx)
- ega_object
- URL (str): URL of the EGA API
- submission_status (str): ega_object submission status. Valid status:
(VALIDATED_WITH_ERRORS, VALIDATED, DRAFT)
'''
# grab all aliases with submit status
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
try:
cur.execute('SELECT {0}.alias FROM {0} WHERE {0}.Status=\"submit\" AND {0}.egaBox=\"{1}\"'.format(table, box))
# extract all information
data = cur.fetchall()
except:
data = []
conn.close()
# check if alias with submit status
if len(data) != 0:
# extract the aliases
aliases = [i[0] for i in data]
# parse credentials to get userName and Password
credentials = extract_credentials(credential_file)
# create json with credentials
submission_data = {"username": box, "password": credentials[box], "loginType": "submitter"}
# re-format URL if ending slash missing
URL = format_url(URL)
# connect to api and get a token
token = connect_to_api(box, credentials[box], URL)
# retrieve all objects with submission_status
headers = {"Content-type": "application/json", "X-Token": token}
response = requests.get(URL + '{0}?status={1}&skip=0&limit=0'.format(ega_object, submission_status), headers=headers, data=submission_data)
# loop over aliases
for i in range(len(aliases)):
objectId = ''
for j in response.json()['response']['result']:
# check if alias with validated_with_errors status
if j["alias"] == aliases[i]:
objectId = j['id']
if objectId != '':
# delete object
requests.delete(URL + '/{0}/{1}'.format(ega_object, objectId), headers=headers)
break
# disconnect from api
requests.delete(URL + 'logout', headers={"X-Token": token})
def register_objects(credential_file, database, table, box, ega_object, portal):
'''
(str, str, str, str, str, str) -> None
Register the ega_objects in EGA box using the submission portal by
submitted a json for each ega object in table of database with submit status
Parameters
----------
- credential_file (str): File with EGA box and database credentials
- database (str): Name of the submission database
- table (str): Table in database
- box (str): EGA submission box (ega-box-xxxx)
- ega_object (str): Registered object at the EGA. Accepted values:
studies, runs, samples, experiments, datasets, analyses, policies, dacs
- portal (str): URL address of the EGA submission API
'''
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
try:
cur.execute('SELECT {0}.Json, {0}.egaAccessionId FROM {0} WHERE {0}.Status=\"submit\" AND {0}.egaBox=\"{1}\"'.format(table, box))
# extract all information
data = cur.fetchall()
except:
# record error message
data = []
conn.close()
# check that objects in submit mode do exist
if len(data) != 0:
# make a list of jsons. filter out filesobjects already registered that have been re-uploaded because not archived
L = [json.loads(i[0].replace("'", "\""), strict=False) for i in data if not i[1].startswith('EGA')]
# format chromosomeReferences field
for i in range(len(L)):
if 'chromosomeReferences' in L[i]:
if L[i]['chromosomeReferences'] != []:
for j in range(len(L[i]['chromosomeReferences'])):
if L[i]['chromosomeReferences'][j]['label'] == 'None':
L[i]['chromosomeReferences'][j]['label'] = None
# connect to EGA and get a token
credentials = extract_credentials(credential_file)
# make sure portal ends with a slash
portal = format_url(portal)
# connect to API and open a submission for each object
for J in L:
# record error message if no token or open submission if token is obtained
try:
token = connect_to_api(box, credentials[box], portal)
except:
# record error message
record_message(credential_file, database, table, box, J["alias"], 'Cannot obtain a token', 'Error')
else:
# open a submission with token
headers = {"Content-type": "application/json", "X-Token": token}
submission_json = {"title": "{0} submission".format(ega_object), "description": "opening a submission for {0} {1}".format(ega_object, J["alias"])}
open_submission = requests.post(portal + 'submissions', headers=headers, data=str(submission_json).replace("'", "\""))
# record error if submission Id is not retrieved or create object if submission successfully open
try:
# get submission Id
submissionId = open_submission.json()['response']['result'][0]['id']
except:
# record error message
record_message(credential_file, database, table, box, J["alias"], 'Cannot obtain a submissionId', 'Error')
else:
# create object
object_creation = requests.post(portal + 'submissions/{0}/{1}'.format(submissionId, ega_object), headers=headers, data=str(J).replace("'", "\""))
# record status (DRAFT) and validate object if created or record error message. status --> VALIDATED or VALITED_WITH_ERRORS
try:
objectId = object_creation.json()['response']['result'][0]['id']
submission_status = object_creation.json()['response']['result'][0]['status']
except:
# record error message
error = object_creation.json()['header']['userMessage']
record_message(credential_file, database, table, box, J["alias"], 'Cannot create an object: {0}'.format(error), 'Error')
else:
# store submission json and status (DRAFT) in db table
record_message(credential_file, database, table, box, J["alias"], submission_status, 'Status')
# validate object
object_validation = requests.put(portal + '{0}/{1}?action=VALIDATE'.format(ega_object, objectId), headers=headers)
# record status and submit object or record error message
try:
object_status = object_validation.json()['response']['result'][0]['status']
error_messages = clean_up_error(object_validation.json()['response']['result'][0]['validationErrorMessages'])
except:
error = object_validation.json()['header']['userMessage'] + ';' + object_validation.json()['header']['developerMessage']
record_message(credential_file, database, table, box, J["alias"], 'Cannot obtain validation status: {0}'.format(error), 'Error')
else:
# record error messages
record_message(credential_file, database, table, box, J["alias"], error_messages, 'Error')
# record object status
record_message(credential_file, database, table, box, J["alias"], object_status, 'Status')
# check if object is validated
if object_status == 'VALIDATED':
# submit object
object_submission = requests.put(portal + '{0}/{1}?action=SUBMIT'.format(ega_object, objectId), headers=headers)
# update error, record status or record error if submission cannot be done
try:
error_messages = clean_up_error(object_submission.json()['response']['result'][0]['submissionErrorMessages'])
object_status = object_submission.json()['response']['result'][0]['status']
except:
# record error message
record_message(credential_file, database, table, box, J["alias"], 'Cannot obtain submission status', 'Error')
else:
# update error, record status
record_message(credential_file, database, table, box, J["alias"], error_messages, 'Error')
record_message(credential_file, database, table, box, J["alias"], object_status, 'Status')
# check status
if object_status == 'SUBMITTED':
# get the receipt, and the accession id
try:
receipt = str(object_submission.json()).replace("\"", "")
# egaAccessionId is None for experiments, but can be obtained from the list of egaAccessionIds
if ega_object == 'experiments':
egaAccessionId = object_submission.json()['response']['result'][0]['egaAccessionIds'][0]
else:
egaAccessionId = object_submission.json()['response']['result'][0]['egaAccessionId']
except:
# record error message
record_message(credential_file, database, table, box, J["alias"], 'Cannot obtain receipt and/or accession Id', 'Error')
else:
# store the date it was submitted
current_time = time.strftime('%Y-%m-%d', time.localtime(time.time()))
# add receipt, accession and time to table and change status
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
cur.execute('UPDATE {0} SET {0}.Receipt=\"{1}\", {0}.egaAccessionId=\"{2}\", {0}.Status=\"{3}\", {0}.submissionStatus=\"{3}\", {0}.CreationTime=\"{4}\" WHERE {0}.alias=\"{5}\" AND {0}.egaBox=\"{6}\"'.format(table, receipt, egaAccessionId, object_status, current_time, J["alias"], box))
conn.commit()
conn.close()
else:
# delete object
requests.delete(portal + '{0}/{1}'.format(ega_object, objectId), headers=headers)
else:
#delete object
requests.delete(portal + '{0}/{1}'.format(ega_object, objectId), headers=headers)
# disconnect by removing token
close_api_connection(token, portal)
def extract_accessions(credential_file, database, box, table):
'''
(file, str, str, str) -> dict
Returns a dictionary with alias: accessions pairs registered in box for the given object/Table
Parameters
----------
- credential_file (str): Path to the file with the database and EGA box credentials
- database (str): Name of the database
- box (str): EGA box (e.g. ega-box-xxx)
- table (str): Name of table in database
'''
# connect to metadata database
conn = connect_to_database(credential_file, database)
cur = conn.cursor()
# pull down analysis alias and egaId from metadata db, alias should be unique
cur.execute('SELECT {0}.alias, {0}.egaAccessionId from {0} WHERE {0}.egaBox=\"{1}\"'.format(table, box))
# create a dict {alias: accession}
# some PCSI aliases are not unique, 1 sample is chosen arbitrarily
registered = {}
for i in cur:
registered[i[0]] = i[1]
conn.close()
return registered
def map_enumerations():
'''
(None) -> dict
Returns a dictionary of EGA Id corrresponding to enumations
'''
# map typeId with enumerations
map_enum = {"experimentTypeId": "ExperimentTypes", "analysisTypeId": "AnalysisTypes",
"caseOrControlId": "CaseControl", "genderId": "Genders", "datasetTypeIds": "DatasetTypes",
"instrumentModelId": "InstrumentModels", "librarySourceId": "LibrarySources",
"librarySelectionId": "LibrarySelections", "libraryStrategyId": "LibraryStrategies",
"studyTypeId": "StudyTypes", "chromosomeReferences": "ReferenceChromosomes",
"genomeId": "ReferenceGenomes", "fileTypeId": "AnalysisFileTypes", "runFileTypeId": "FileTypes"}
return map_enum
def get_json_keys(ega_object, action):
'''
(str, str) -> list
Returns a list of required keys to validate or form the submission json
Parameters
----------
- ega_object (str): Registered object at the EGA. Accepted values:
studies, runs, samples, experiments, datasets, analyses, policies, dacs
- action (str): String specifying which json keys are required for json validation or formation
Accepted values: "validation" or "formation"
'''
if ega_object == 'analyses':
required = ['StagePath', 'alias', 'analysisCenter', 'analysisTypeId', 'description',
'experimentTypeId', 'files', 'genomeId', 'sampleReferences', 'studyId', 'title']
if action == 'validation':
required.extend(['egaBox', 'AttributesKey', 'ProjectKey', 'Broker'])
elif ega_object == 'samples':
required = ['alias', 'caseOrControlId', 'description', 'genderId', 'phenotype', 'title']
if action == 'validation':
required.extend(['egaBox', 'AttributesKey'])
elif ega_object == 'datasets':
required = ['alias', 'datasetTypeIds', 'description', 'egaBox', 'policyId', 'title']
elif ega_object == 'studies':
required = ['alias', 'egaBox', 'studyAbstract', 'studyTypeId', 'title']
elif ega_object == 'policies':
required = ['alias', 'dacId', 'egaBox', 'policyText', 'title']
elif ega_object == 'dacs':
required = ['alias', 'contacts', 'egaBox', 'title']
elif ega_object == 'runs':
required = ['alias', 'egaBox', 'experimentId', 'files', 'runFileTypeId', 'sampleId']
elif ega_object == 'experiments':
required = ['alias', 'designDescription', 'egaBox', 'instrumentModelId',
'libraryLayoutId', 'libraryName', 'librarySelectionId',
'librarySourceId', 'libraryStrategyId', 'pairedNominalLength',
'pairedNominalSdev', 'sampleId', 'studyId', 'title']
return required
def is_info_valid(credential_file, metadata_database, submission_database, table, box, ega_object, **KeyWordParams):
'''
(str, str, str, str, str, str, dict) -> dict
Checks if the information stored in the database tables is valid and returns
a dictionary with a error message for each object alias
Parameters
----------
- credential_file (str): File with EGA box and database credentials
- metadata_database (str): Name of the database storing EGA metadata information
- submission_database (str): Name of the database storing information required for submission
- table (str): Name of table in database
- box (str): EGA submission box (ega-box-xxxx)
- ega_object (str): Registered object at the EGA. Accepted values:
studies, runs, samples, experiments, datasets, analyses, policies, dacs
- KeyWordParams (dict): Optional table arguments. Valid key words: 'attributes' or 'projects'
'''
# create a dictionary {alias: error}
D = {}
# get the enumerations
enumerations = list_enumerations()
# connect to db
conn = connect_to_database(credential_file, submission_database)
cur = conn.cursor()
# get optional tables
if 'attributes' in KeyWordParams:
attributes_table = KeyWordParams['attributes']
if 'projects' in KeyWordParams:
projects_table = KeyWordParams['projects']
# get the json keys
required = get_json_keys(ega_object, 'validation')
# get required information
if ega_object == 'analyses':
# extract information from Analyses, atrtibutes and projects tables
cmd = 'SELECT {0}.alias, {0}.sampleReferences, {0}.files, {0}.egaBox, \
{0}.AttributesKey, {0}.ProjectKey, {1}.title, {1}.description, {1}.attributes, \
{1}.genomeId, {1}.StagePath, {2}.studyId, {2}.analysisCenter, {2}.Broker, \
{2}.analysisTypeId, {2}.experimentTypeId FROM {0} JOIN {1} JOIN {2} \
WHERE {0}.Status=\"start\" AND {0}.egaBox=\"{3}\" AND {0}.AttributesKey={1}.alias \
AND {0}.ProjectKey={2}.alias'.format(table, attributes_table, projects_table, box)
elif ega_object == 'samples':
cmd = 'Select {0}.alias, {0}.caseOrControlId, {0}.genderId, {0}.phenotype, {0}.egaBox, \
{0}.AttributesKey, {1}.title, {1}.description, {1}.attributes FROM {0} JOIN {1} WHERE \
{0}.Status=\"start\" AND {0}.egaBox=\"{2}\" AND {0}.AttributesKey={1}.alias'.format(table, attributes_table, box)
elif ega_object == 'datasets':
cmd = 'SELECT {0}.alias, {0}.datasetTypeIds, {0}.policyId, {0}.runsReferences, {0}.analysisReferences, \
{0}.title, {0}.description, {0}.datasetLinks, {0}.attributes , {0}.egaBox FROM {0} WHERE {0}.Status=\"start\" AND {0}.egaBox=\"{1}\"'.format(table, box)
elif ega_object == 'experiments':
cmd = 'SELECT {0}.alias, {0}.title, {0}.instrumentModelId, {0}.librarySourceId, \
{0}.librarySelectionId, {0}.libraryStrategyId, {0}.designDescription, {0}.libraryName, \
{0}.libraryConstructionProtocol, {0}.libraryLayoutId, {0}.pairedNominalLength, \
{0}.pairedNominalSdev, {0}.sampleId, {0}.studyId, {0}.egaBox FROM {0} WHERE {0}.Status=\"start\" AND {0}.egaBox=\"{1}\"'.format(table, box)
elif ega_object == 'studies':
cmd = 'SELECT {0}.alias, {0}.studyTypeId, {0}.shortName, {0}.title, \
{0}.studyAbstract, {0}.ownTerm, {0}.pubMedIds, {0}.customTags, {0}.egaBox FROM {0} \
WHERE {0}.Status=\"start\" AND {0}.egaBox=\"{1}\"'.format(table, box)
elif ega_object == 'policies':
cmd = 'SELECT {0}.alias, {0}.dacId, {0}.title, {0}.policyText, {0}.url, {0}.egaBox FROM {0} \
WHERE {0}.Status=\"start\" AND {0}.egaBox=\"{1}\"'.format(table, box)
elif ega_object == 'dacs':
cmd = 'SELECT {0}.alias, {0}.title, {0}.contacts, {0}.egaBox FROM {0} WHERE {0}.status=\"start\" AND {0}.egaBox="\{1}\"'.format(table, box)
elif ega_object == 'runs':
cmd = 'SELECT {0}.alias, {0}.sampleId, {0}.runFileTypeId, {0}.experimentId, \
{0}.files, {0}.egaBox FROM {0} WHERE {0}.status=\"start\" AND {0}.egaBox=\"{1}\"'.format(table, box)
# extract data
try:
cur.execute(cmd)
data = cur.fetchall()
keys = [i[0] for i in cur.description]
except:
data = []
conn.close()
# map typeId with enumerations
map_enum = map_enumerations()
# check info
if len(data) != 0:
for i in range(len(data)):
# set up boolean. update if missing values
missing = False
# create a dict with all information
d = {keys[j]: data[i][j] for j in range(len(keys))}
# create an error message
error = []
# check if information is valid
for key in keys:
if key in required:
if d[key] in ['', 'NULL', None]:
missing = True
error.append('Missing required key {0}'.format(key))
# check that alias is not already used
if key == 'alias':
# extract alias and accessions from table
registered = extract_accessions(credential_file, metadata_database, box, table)
if d[key] in registered:
# alias already used for the same table and box
missing = True
error.append('Alias already registered')
if ega_object in ['runs', 'analyses'] and '__' in d[key]:
# double underscore is not allowed in runs and analyses alias
# because alias and file name are retrieved from job name
# split on double underscore for checking upload and encryption
missing = True
error.append('Double underscore not allowed in runs and analyses alias')
# check that references are provided for datasets
if 'runsReferences' in d and 'analysisReferences' in d:
# at least runsReferences or analysesReferences should include some accessions
if d['runsReferences'] in ['', 'NULL', None] and d['analysisReferences'] in ['', 'NULL', None]:
missing = True
error.append('Missing runsReferences and analysisReferences')
if d['runsReferences'] not in ['', 'NULL', None]:
if False in list(map(lambda x: x.startswith('EGAR'), d['runsReferences'].split(';'))):
missing = True
error.append('Missing runsReferences')
if d['analysisReferences'] not in ['', 'NULL', None]:
if False in list(map(lambda x: x.startswith('EGAZ'), d['analysisReferences'].split(';'))):
missing = True
error.append('Missing analysisReferences')
# check that accessions or aliases are provided
if key in ['sampleId', 'sampleReferences', 'dacId', 'studyId']:
if d[key] in ['', 'None', None, 'NULL']:
missing = True
error.append('Missing alias and or accession for {0}'.format(key))
# check files
if key == 'files':
files = json.loads(d['files'].replace("'", "\""))
for file_path in files:
# check if file is valid
if os.path.isfile(file_path) == False:
missing = True
error.append('Invalid file paths')
# check validity of file type for Analyses objects only. doesn't exist for Runs
if ega_object == 'Analyses':
if files[file_path]['fileTypeId'].lower() not in enumerations['FileTypes']:
missing = True
error.append('Invalid fileTypeId')
# check policy Id
if key == 'policyId':
if 'EGAP' not in d[key]:
missing = True
error.append('Invalid policyId, should start with EGAP')
# check library layout
if key == "libraryLayoutId":
if str(d[key]) not in ['0', '1']:
missing = True
error.append('Invalid {0}: should be 0 or 1'.format(key))
if key in ['pairedNominalLength', 'pairedNominalSdev']:
try:
float(d[key])
except:
missing = True
error.append('Invalid type for {0}, should be a number'.format(key))
# check enumerations
if key in map_enum:
# datasetTypeIds can be a list of multiple Ids
if key == 'datasetTypeIds':
for k in d[key].split(';'):
if k not in enumerations[map_enum[key]]:
missing = True
error.append('Invalid enumeration for {0}'.format(key))
# check that enumeration is valid
if d[key] not in enumerations[map_enum[key]]:
missing = True
error.append('Invalid enumeration for {0}'.format(key))
# check custom attributes
if key == 'attributes':
if d['attributes'] not in ['', 'NULL', None]:
# check format of attributes
attributes = [json.loads(j.replace("'", "\"")) for j in d['attributes'].split(';')]
for k in attributes:
# do not allow keys other than tag, unit and value
if set(k.keys()).union({'tag', 'value', 'unit'}) != {'tag', 'value', 'unit'}:
missing = True
error.append('Invalid {0} format'.format(key))
# tag and value are required keys
if 'tag' not in k.keys() and 'value' not in k.keys():
missing = True
error.append('Missing tag and value from {0}'.format(key))
# check if object has missing/non-valid information
if missing == True:
error = ';'.join(list(set(error)))
elif missing == False:
error = 'NoError'
assert d['alias'] not in D
D[d['alias']] = error
return D
def check_table_information(credential_file, metadata_database, submission_database, table, ega_object, box, **KeyWordParams):
'''
(str, str, str, str, str, str, dict) -> None
Checks that information in the submission database is valid and updates
the errorMessages column for each alias in table
Parameters
----------
- credential_file (str): File with EGA box and database credentials
- metadata_database (str): Name of the database storing EGA metadata information
- submission_database (str): Name of the database storing information required for submission
- table (str): Name of table in database
- ega_object (str): Registered object at the EGA. Accepted values:
studies, runs, samples, experiments, datasets, analyses, policies, dacs
- box (str): EGA submission box (ega-box-xxxx)
- KeyWordParams (dict): Optional table arguments. Valid key words: 'attributes' or 'projects'
'''
# check Table information
D = is_info_valid(credential_file, metadata_database, submission_database, table, box, ega_object, **KeyWordParams)
# extract all aliases with start status
conn = connect_to_database(credential_file, submission_database)
cur = conn.cursor()
try:
cur.execute('SELECT {0}.alias FROM {0} WHERE {0}.Status=\"start\" AND {0}.egaBox=\"{1}\"'.format(table, box))
data = [i[0] for i in cur]
except:
data = []
conn.close()
# create dict {alias: errors}
K = {}
# record error messages
for alias in data:
if alias in D:
K[alias] = D[alias]
else:
K[alias] = 'No information. Possible issues with table keys or database connection'
# update status and record errorMessage
conn = connect_to_database(credential_file, submission_database)
cur = conn.cursor()
for alias in K:
# record error message and/or update status
if K[alias] == 'NoError':
# update status
cur.execute('UPDATE {0} SET {0}.errorMessages=\"{1}\", {0}.Status=\"clean\" WHERE {0}.alias=\"{2}\" AND {0}.egaBox=\"{3}\"'.format(table, K[alias], alias, box))
conn.commit()
else:
# record error
cur.execute('UPDATE {0} SET {0}.errorMessages=\"{1}\" WHERE {0}.alias=\"{2}\" AND {0}.egaBox=\"{3}\"'.format(table, K[alias], alias, box))
conn.commit()
conn.close()
def is_gzipped(file):
'''
(str) -> bool
Returns True if file is gzipped
Parameters
----------
- file (str): Path to file
'''
# open file in rb mode
infile = open(file, 'rb')
header = infile.readline()
infile.close()
if header.startswith(b'\x1f\x8b\x08'):
return True
else:
return False
def open_file(file):
'''
(str) -> _io.TextIOWrapper
Returns a file opened in text mode if file is gzipped or not
Parameters
----------
- file (str): Path to file gzipped or not
'''
# check if vcf if gzipped
if is_gzipped(file):
infile = gzip.open(file, 'rt')
else:
infile = open(file)
return infile
def extract_contigs_from_vcf(file):
'''
(str) -> list
Returns a list of contigs found in the vcf header or body if header is missing
Parameters
----------
- file (str): Path to vcf file
'''
contigs = []
infile = open_file(file)
# read vcf header
for line in infile:
if line.startswith('##contig'):
contig = line.split(',')[0].split('=')[-1]
if '_' in contig:
contig = contig[:contig.index('_')]
if not contig.lower().startswith('chr'):
contig = 'chr' + contig
contigs.append(contig)
elif not line.startswith('#'):
line = line.rstrip().split('\t')
if '_' in line[0]:
contig = line[0][:line[0].index('_')]
if not contig.lower().startswith('chr'):
contig = 'chr' + contig
contigs.append(contig)
infile.close()
# remove duplicate names
contigs = list(set(contigs))
return contigs
def extract_contigs_from_tsv(file):
'''
(str) -> list
Returns a list of contigs found in a TSV file (compressed or not)
Parameters
----------
- file (str): Path to TSV file, gzipped or not
'''
infile = open_file(file)
# get the chromosomes
chromos = []
# read file
for line in infile:
line = line.rstrip()
if line != '':
line = line.split('\t')
line = list(map(lambda x: x.strip(), line))
contig = line[0]
if 'chr' not in contig.lower():
contig = 'chr' + contig
else:
contig = contig.lower()
chromos.append(contig)
infile.close()
chromos = list(set(chromos))
return chromos
def map_chromo_names():
'''
(None) -> dict
Returns a dictionary of chromosomes, names pair, values
'''
chromo_to_names = {'chr1': 'CM000663', 'chr2': 'CM000664', 'chr3': 'CM000665',
'chr4': 'CM000666', 'chr5': 'CM000667', 'chr6': 'CM000668',
'chr7': 'CM000669', 'chr8': 'CM000670', 'chr9': 'CM000671',
'chr10': 'CM000672', 'chr11': 'CM000673', 'chr12': 'CM000674',
'chr13': 'CM000675', 'chr14': 'CM000676', 'chr15': 'CM000677',
'chr16': 'CM000678', 'chr17': 'CM000679', 'chr18': 'CM000680',
'chr19': 'CM000681', 'chr20': 'CM000682', 'chr21': 'CM000683',
'chr22': 'CM000684', 'chrX': 'CM000685', 'chrY': 'CM000686'}
return chromo_to_names
def format_json(D, ega_object):
'''
(dict, str) -> dict
Returns a dictionary with information in the expected submission format or
a dictionary with the object alias if required fields are missing
Precondition: strings in D have double-quotes
Parameters
----------
- D (dict): Dictionary with information extracted from the submission database for a given EGA object
- ega_object (str): Registered object at the EGA. Accepted values:
studies, runs, samples, experiments, datasets, analyses, policies, dacs
'''
# get the EGA enumerations
enumerations = list_enumerations()
# create a dict to be strored as a json. note: strings should have double quotes
J = {}
# get required json keys
required = get_json_keys(ega_object, 'formation')
# map typeId with enumerations
map_enum = map_enumerations()
# map chromosome names for vcf
chromo_to_names = map_chromo_names()
# reverse dictionary
names_to_chromo = {}
for i in chromo_to_names:
names_to_chromo[chromo_to_names[i]] = i
# loop over required json keys
for field in D:
if D[field] in ['NULL', '', None]:
# some fields are required, return empty dict if field is empty
if field in required:
# erase dict and add alias
J = {}
J["alias"] = D["alias"]
# return dict with alias only if required fields are missing
return J
# other fields can be missing, either as empty list or string
else:
# check if field is already recorded. eg: chromosomeReferences may be set already for vcf
if field not in J: