1
1
package au .csiro .variantspark .api
2
2
3
- import au .csiro .variantspark .input .{CsvLabelSource , FeatureSource , VCFFeatureSource , VCFSource }
3
+ import au .csiro .variantspark .input .{
4
+ CsvLabelSource ,
5
+ FeatureSource ,
6
+ VCFFeatureSource ,
7
+ VCFSource ,
8
+ CsvFeatureSource ,
9
+ CsvStdFeatureSource ,
10
+ UnionedFeatureSource
11
+ }
4
12
import org .apache .hadoop .fs .FileSystem
5
13
import org .apache .spark .sql .SparkSession
6
14
import org .apache .spark .sql .SQLContext
7
- import au .csiro .variantspark .input .CsvFeatureSource
8
- import au .csiro .variantspark .input .CsvFeatureSource ._
9
15
import com .github .tototoshi .csv .CSVFormat
10
16
import au .csiro .variantspark .input .DefaultCSVFormatSpec
11
17
import org .apache .hadoop .conf .Configuration
12
18
import org .apache .spark .SparkContext
19
+ import org .apache .spark .rdd .RDD
20
+ import scala .collection .JavaConverters ._
13
21
14
22
trait SqlContextHolder {
15
23
def sqlContext : SQLContext
@@ -28,7 +36,7 @@ class VSContext(val spark: SparkSession) extends SqlContextHolder {
28
36
29
37
/** Import features from a VCF file
30
38
* @param inputFile path to file or directory with VCF files to load
31
- * @return FeatureSource loaded from the VCF file or files
39
+ * @return FeatureSource loaded from the VCF file
32
40
*/
33
41
def importVCF (inputFile : String , sparkPar : Int = 0 ): FeatureSource = {
34
42
val vcfSource =
@@ -37,12 +45,62 @@ class VSContext(val spark: SparkSession) extends SqlContextHolder {
37
45
}
38
46
39
47
/** Import features from a CSV file
40
- * @param inputFile: path to file or directory with VCF files to load
48
+ * @param inputFile: path to CSV file
49
+ * @param optVariableTypes: optional type specifications
50
+ * @param csvFormat: [[com.github.tototoshi.csv.CSVFormat ]] row format
51
+ * @return FeatureSource loaded from the CSV file
52
+ */
53
+ def importTransposedCSV (inputFile : String ,
54
+ optVariableTypes : Option [RDD [String ]], csvFormat : CSVFormat ): FeatureSource = {
55
+ CsvFeatureSource (sc.textFile(inputFile), csvFormat = csvFormat,
56
+ optVariableTypes = optVariableTypes)
57
+ }
58
+ def importTransposedCSV (inputFile : String ,
59
+ variableTypes : java.util.ArrayList [String ] = null ): FeatureSource = {
60
+ val csvFormat : CSVFormat = DefaultCSVFormatSpec
61
+ val optVariableTypes : Option [RDD [String ]] = Option (variableTypes).map { types =>
62
+ sc.parallelize(types.asScala.toSeq)
63
+ }
64
+ importTransposedCSV(inputFile, optVariableTypes, csvFormat)
65
+ }
66
+ def importTransposedCSV (inputFile : String ): FeatureSource = {
67
+ val csvFormat : CSVFormat = DefaultCSVFormatSpec
68
+ val optVariableTypes : Option [RDD [String ]] = None
69
+ importTransposedCSV(inputFile, optVariableTypes, csvFormat)
70
+ }
71
+
72
+ /** Import features from a transposed CSV file
73
+ * @param inputFile: path to CSV file
74
+ * @param optVariableTypes: optional type specifications
41
75
* @param csvFormat: [[com.github.tototoshi.csv.CSVFormat ]] row format
42
- * @return FeatureSource loaded from the VCF file or files
76
+ * @return FeatureSource loaded from CSV file
77
+ */
78
+ def importStdCSV (inputFile : String ,
79
+ optVariableTypes : Option [RDD [String ]], csvFormat : CSVFormat ): FeatureSource = {
80
+ CsvStdFeatureSource (sc.textFile(inputFile), csvFormat = csvFormat,
81
+ optVariableTypes = optVariableTypes)
82
+ }
83
+ def importStdCSV (inputFile : String ,
84
+ variableTypes : java.util.ArrayList [String ] = null ): FeatureSource = {
85
+ val csvFormat : CSVFormat = DefaultCSVFormatSpec
86
+ val optVariableTypes : Option [RDD [String ]] = Option (variableTypes).map { types =>
87
+ sc.parallelize(types.asScala.toSeq)
88
+ }
89
+ importStdCSV(inputFile, optVariableTypes, csvFormat)
90
+ }
91
+ def importStdCSV (inputFile : String ): FeatureSource = {
92
+ val csvFormat : CSVFormat = DefaultCSVFormatSpec
93
+ val optVariableTypes : Option [RDD [String ]] = None
94
+ importStdCSV(inputFile, optVariableTypes, csvFormat)
95
+ }
96
+
97
+ /** Combine FeatureSource objects (typically a genotype source and a covariate source)
98
+ * @param featureSource: FeatureSource object containing genotype information
99
+ * @param covariateSource: FeatureSource object containing covariate information
43
100
*/
44
- def importCSV (inputFile : String , csvFormat : CSVFormat = DefaultCSVFormatSpec ): FeatureSource = {
45
- CsvFeatureSource (sc.textFile(inputFile), csvFormat = csvFormat)
101
+ def unionFeaturesAndCovariates (featureSource : FeatureSource ,
102
+ covariateSource : FeatureSource ): FeatureSource = {
103
+ UnionedFeatureSource (featureSource, covariateSource)
46
104
}
47
105
48
106
/** Loads a labels form a column in a CSV file
0 commit comments