-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathavro.ts
176 lines (149 loc) Β· 5.86 KB
/
avro.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import {Command, flags} from '@oclif/command'
import avro from 'avsc'
import chalk from 'chalk'
import fs from 'fs' // includes all from avro-js and some more
import Json2Csv from 'json-2-csv'
import Logger from '../utilities/logger'
import Utilities from '../utilities/utilities'
export default class Avro extends Command {
static description = 'Avro Utility command'
static GET_SCHEMA = 'get_schema'
static TO_JSON = 'to_json'
static TO_AVRO = 'to_avro'
static TO_CSV = 'to_csv'
// do not change order otherwise we need to change order in getCommand() also
static SupportedCommands = [Avro.GET_SCHEMA, Avro.TO_JSON, Avro.TO_AVRO, Avro.TO_CSV]
static flags = {
help: flags.help({char: 'h'}),
command: flags.string({char: 'c' , description: `commands supported: ${Avro.SupportedCommands}`}),
file: flags.string({char: 'f' , description: 'input file path'}),
output: flags.string({char: 'o' , description: 'output file path'}),
schemaType: flags.string({char: 't' , description: 'schema type file path'}),
}
static args = [{name: 'command'}] // operation type
/*
* input,output, and operation are all must
* */
async run() {
const {args, flags} = this.parse(Avro)
this.checkParameters(flags, args)
this.executeCommand(flags, args)
}
// to check required parameters passed or not
private checkParameters(flags: any, args: any) {
if (!flags.file)
Logger.error(this, 'Input file is not provided')
if (flags.command) // if -c flag have value, then override
args.command = flags.command
if (!args.command)
Logger.error(this, 'Command is empty or not provided, supported:' + Avro.SupportedCommands)
else // if exists then make Lower Case
args.command = args.command.toLowerCase()
// output is not mendatory for 'get_schema' command
if (args.command !== Avro.GET_SCHEMA && !flags.output)
Logger.error(this, 'Output file is not provided')
}
private executeCommand(flags: any, args: any) {
switch (args.command) {
case Avro.GET_SCHEMA:
return this.getSchema(flags, args)
case Avro.TO_JSON:
return this.toJson(flags, args)
case Avro.TO_AVRO:
return this.toAvro(flags, args)
case Avro.TO_CSV:
return this.toCsv(flags, args)
default:
Logger.error(this, 'Unsupported Command, supported: ' + Avro.SupportedCommands)
}
}
// tslint:disable-next-line:no-unused
private getSchema(flags: any, args: any) {
avro.createFileDecoder(flags.file)
.on('metadata', function (type) {
let output = type.schema()
let schemaStr = JSON.stringify(output)
if (flags.output) {
// @ts-ignore
Utilities.writeStringToFile(this, flags.output, schemaStr)
} else {
// @ts-ignore
Logger.success(this,
`${chalk.yellow('Avro Schema')}\n${JSON.stringify(output, null, ' ')}`
)
}
})
}
// tslint:disable-next-line:no-unused
private toJson(flags: any, args: any) {
Logger.progressStart(this, 'Converting Avro To Json')
// setTimeout(() => {
Logger.progressStop(this, ' Converting Avro To Json')
Utilities.truncateFile(this, flags.output)
avro.createFileDecoder(flags.file)
.on('data', function (recordStr) {
// @ts-ignore
Utilities.appendStringToFile(this, flags.output, JSON.stringify(recordStr))
})
Logger.success(this, `${chalk.blue('Json')} written to file: ${chalk.green(flags.output)}`) // this will output error and exit command
// }, 1000)
}
// tslint:disable-next-line:no-unused
private toCsv(flags: any, args: any) {
Logger.progressStart(this, 'Converting Avro To Csv')
// setTimeout(() => {
Logger.progressStop(this, ' Converting Avro To Csv')
Utilities.truncateFile(this, flags.output)
let prependHeader = true // only write on the first line
avro.createFileDecoder(flags.file)
.on('data', function (recordStr) {
// @ts-ignore
let json = JSON.parse(JSON.stringify(recordStr))
Json2Csv.json2csv(json, (err?: Error, csv?: string) => {
if (csv) {
// @ts-ignore
Utilities.appendStringToFile(this, flags.output, csv + '\n')
}
if (err) {
// @ts-ignore
Logger.error(this, err.toString())
}
}, {prependHeader})
prependHeader = false
})
Logger.success(this, `${chalk.blue('Csv')} written to file: ${chalk.green(flags.output)}`) // this will output error and exit command
// }, 300)
}
private toAvro(flags: any, args: any) {
if (!flags.schemaType)
Logger.error(this, 'Schema file is not provided')
Logger.progressStart(this, 'Generating Avro')
// setTimeout(() => {
Logger.progressStop(this, ' Generating Avro')
let schema = avro.parse(flags.schemaType)
let avroEncoder = new avro.streams.BlockEncoder(schema)
avroEncoder.pipe(fs.createWriteStream(flags.output))
// We write the records to the block encoder, which will take care of serializing them
// into an object container file.
let inputString = Utilities.getInputString(this, flags, args)
let jsonStr = this.convertAvroJsonToValidJson(inputString)
let jsonObjects = JSON.parse(jsonStr)
jsonObjects.forEach(function (data: any) {
if (schema.isValid(data)) {
avroEncoder.write(data)
} else {
// @ts-ignore
Logger.warn(this, `${chalk.yellow('[SKIPPING RECORD]')} schema is invalid: ${chalk.yellowBright(JSON.stringify(data))}`)
}
})
Logger.success(this, `${chalk.blue('Avro')} written to file: ${chalk.green(flags.output)}`) // this will output error and exit command
avroEncoder.end()
// }, 300)
}
private convertAvroJsonToValidJson(json: string) {
let jsonStr = '[' + json + ']'
jsonStr = jsonStr.replace(/[\s\n]+/mg, '')
jsonStr = jsonStr.replace(/\}\{/mg, '},{')
return jsonStr
}
}