diff --git a/constants/ffmpeg.go b/constants/ffmpeg.go index 285c4c9..91a497c 100644 --- a/constants/ffmpeg.go +++ b/constants/ffmpeg.go @@ -3,8 +3,7 @@ package constants type FILE_TYPE int const ( - Audio192K FILE_TYPE = iota - ImagePng + ImagePng FILE_TYPE = iota Video5M Video3M Video1M @@ -12,6 +11,11 @@ const ( Video400K ) +const ( + Audio192K FILE_TYPE = iota + Audio128K +) + type FFMPEG_KWARGS int const ( @@ -52,6 +56,7 @@ const Scale = "scale" var AudioFileTypeMap = map[FILE_TYPE]string{ Audio192K: "_audio192k.m4a", + Audio128K: "_audio128k.m4a", } var ImageFileTypeMap = map[FILE_TYPE]string{ @@ -68,6 +73,7 @@ var VideoFileTypeMap = map[FILE_TYPE]string{ var AudioBitrateMap = map[FILE_TYPE]string{ Audio192K: "192k", + Audio128K: "128k", } var VideoBitrateMap = map[FILE_TYPE]string{ diff --git a/controllers/process.go b/controllers/process.go index 051fa9b..a355c42 100644 --- a/controllers/process.go +++ b/controllers/process.go @@ -2,6 +2,7 @@ package controllers import ( "encoding/json" + "log" "net/http" "zestream-server/configs" "zestream-server/types" @@ -26,6 +27,36 @@ func (*Process) Video(c *gin.Context) { return } + log.Println("request", request) + + _, channel, queue, ctx, _ := configs.GetRabbitMQ() + + err = utils.PublishEvent(channel, queue, *ctx, jsonBytes) + + if err != nil { + c.JSON(http.StatusExpectationFailed, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusCreated, gin.H{"status": "success"}) +} + +func (*Process) Audio(c *gin.Context) { + var request types.Audio + + if err := c.ShouldBindJSON(&request); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + jsonBytes, err := json.Marshal(request) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + log.Println("request", request) + _, channel, queue, ctx, _ := configs.GetRabbitMQ() err = utils.PublishEvent(channel, queue, *ctx, jsonBytes) diff --git a/main.go b/main.go index f588093..3797090 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,7 @@ func main() { conn, ch, q, _, cancel := configs.InitRabbitMQ() if *isConsumer { - service.VideoProcessConsumer(ch, q) + service.ProcessConsumer(ch, q) return } diff --git a/routes/routes.go b/routes/routes.go index 944c840..56a8af1 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -32,6 +32,7 @@ func Init() *gin.Engine { // /api/v1 apiV1.POST("video/process", process.Video) + apiV1.POST("audio/process", process.Audio) apiV1.GET("url/presigned", controllers.GetPresignedURL) return r diff --git a/service/consumer.go b/service/consumer.go index 7785048..c258e6a 100644 --- a/service/consumer.go +++ b/service/consumer.go @@ -13,7 +13,7 @@ import ( rmq "github.com/rabbitmq/amqp091-go" ) -func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) { +func ProcessConsumer(ch *rmq.Channel, q *rmq.Queue) { log.Println("Running ZeStream as Consumer") var forever chan struct{} @@ -30,13 +30,13 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) { utils.LogErr(err) msgs, err := ch.Consume( - q.Name, // queue - "VideoProcessConsumer", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args + q.Name, // queue + "ProcessConsumer", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args ) if err != nil { @@ -50,14 +50,27 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) { guard <- 1 var video types.Video + videoErr := json.Unmarshal(d.Body, &video) + if videoErr != nil { + log.Println(videoErr) + continue + } - err := json.Unmarshal(d.Body, &video) + if video.Type == "mp4" { + go processVideo(&video, guard) + continue + } + + var audio types.Audio + audioErr := json.Unmarshal(d.Body, &audio) if err != nil { - log.Println(err) + log.Println(audioErr) continue } - go processVideo(&video, guard) + if audio.Type == "mp3" { + go processAudio(&audio, guard) + } } }() @@ -81,7 +94,7 @@ func processVideo(video *types.Video, guard <-chan int) { utils.LogErr(err) } - generateDash(videoFileName, video.Watermark) + generateVideoDash(videoFileName, video.Watermark) uploader := utils.GetUploader(constants.CloudContainerNames[constants.Dashes], video.ID) @@ -95,3 +108,29 @@ func processVideo(video *types.Video, guard <-chan int) { <-guard } + +func processAudio(audio *types.Audio, guard <-chan int) { + log.Println("Processing Audio: ", audio) + + var fileName = audio.ID + "." + audio.Type + + err := utils.Fetch(audio.Src, fileName) + if err != nil { + utils.LogErr(err) + return + } + + generateAudioDash(fileName) + + uploader := utils.GetUploader(constants.CloudContainerNames[constants.Dashes], audio.ID) + + outputDir, err := utils.GetOutputFilePathName(fileName, "") + utils.LogErr(err) + + utils.UploadToCloudStorage(uploader, outputDir) + + err = os.RemoveAll(outputDir) + utils.LogErr(err) + + <-guard +} diff --git a/service/dash.go b/service/dash.go index 6f8c595..8505c47 100644 --- a/service/dash.go +++ b/service/dash.go @@ -13,7 +13,7 @@ import ( ffmpeg "github.com/u2takey/ffmpeg-go" ) -func generateDash(fileName string, watermark types.WaterMark) { +func generateVideoDash(fileName string, watermark types.WaterMark) { targetFile, err := utils.GetDownloadFilePathName(fileName) if err != nil { log.Println(err) @@ -45,6 +45,32 @@ func generateDash(fileName string, watermark types.WaterMark) { utils.DeleteFile(targetFile) } +func generateAudioDash(fileName string) { + targetFile, err := utils.GetDownloadFilePathName(fileName) + if err != nil { + log.Println(err) + } + + var fileNameStripped = utils.RemoveExtensionFromFile(fileName) + + outputPath, err := utils.GetOutputFilePathName(fileName, fileNameStripped) + if err != nil { + log.Println(err) + return + } + + var wg sync.WaitGroup + + wg.Add(len(constants.AudioFileTypeMap)) + + go generateAudioFiles(targetFile, outputPath, &wg) + + wg.Wait() + + deleteVideoFiles(outputPath) + utils.DeleteFile(targetFile) +} + func generateAudioFiles(targetFile string, outputPath string, wg *sync.WaitGroup) { for fileType, filePrefix := range constants.AudioFileTypeMap { var outputFile = outputPath + filePrefix diff --git a/types/video.go b/types/media.go similarity index 79% rename from types/video.go rename to types/media.go index 62dab70..d9a5e75 100644 --- a/types/video.go +++ b/types/media.go @@ -1,12 +1,20 @@ package types +type Media struct { + ID string `json:"id" binding:"required"` + Src string `json:"src" binding:"required,url"` + Type string `json:"type" binding:"required"` +} + type Video struct { - ID string `json:"id" binding:"required"` - Src string `json:"src" binding:"required,url"` - Type string `json:"type" binding:"required"` + Media Watermark WaterMark `json:"watermark"` } +type Audio struct { + Media +} + type WaterMark struct { ID string `json:"id" binding:"required_if=Watermark 1"` Src string `json:"src" binding:"required_if=Watermark 1"` diff --git a/utils/fetch.go b/utils/fetch.go index 71096fa..7b3b30f 100644 --- a/utils/fetch.go +++ b/utils/fetch.go @@ -15,7 +15,7 @@ Fetch downlods a file to downloads folder from the given url, and names it as given fileName */ func Fetch(url string, fileName string) error { - log.Println("Downloading Video: ", url) + log.Println("Downloading: ", url) newFileName, err := GetDownloadFilePathName(fileName) if err != nil {