Skip to content

Commit

Permalink
feat: audio
Browse files Browse the repository at this point in the history
  • Loading branch information
Rahul Bhatia authored and Rahul Bhatia committed Dec 9, 2023
1 parent 4825168 commit 21d0e17
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 20 deletions.
10 changes: 8 additions & 2 deletions constants/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package constants
type FILE_TYPE int

const (
Audio192K FILE_TYPE = iota
ImagePng
ImagePng FILE_TYPE = iota
Video5M
Video3M
Video1M
Video800K
Video400K
)

const (
Audio192K FILE_TYPE = iota
Audio128K
)

type FFMPEG_KWARGS int

const (
Expand Down Expand Up @@ -52,6 +56,7 @@ const Scale = "scale"

var AudioFileTypeMap = map[FILE_TYPE]string{
Audio192K: "_audio192k.m4a",
Audio128K: "_audio128k.m4a",
}

var ImageFileTypeMap = map[FILE_TYPE]string{
Expand All @@ -68,6 +73,7 @@ var VideoFileTypeMap = map[FILE_TYPE]string{

var AudioBitrateMap = map[FILE_TYPE]string{
Audio192K: "192k",
Audio128K: "128k",
}

var VideoBitrateMap = map[FILE_TYPE]string{
Expand Down
31 changes: 31 additions & 0 deletions controllers/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"encoding/json"
"log"
"net/http"
"zestream-server/configs"
"zestream-server/types"
Expand All @@ -26,6 +27,36 @@ func (*Process) Video(c *gin.Context) {
return
}

log.Println("request", request)

_, channel, queue, ctx, _ := configs.GetRabbitMQ()

err = utils.PublishEvent(channel, queue, *ctx, jsonBytes)

if err != nil {
c.JSON(http.StatusExpectationFailed, gin.H{"error": err.Error()})
return
}

c.JSON(http.StatusCreated, gin.H{"status": "success"})
}

func (*Process) Audio(c *gin.Context) {
var request types.Audio

if err := c.ShouldBindJSON(&request); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

jsonBytes, err := json.Marshal(request)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

log.Println("request", request)

_, channel, queue, ctx, _ := configs.GetRabbitMQ()

err = utils.PublishEvent(channel, queue, *ctx, jsonBytes)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
conn, ch, q, _, cancel := configs.InitRabbitMQ()

if *isConsumer {
service.VideoProcessConsumer(ch, q)
service.ProcessConsumer(ch, q)
return
}

Expand Down
1 change: 1 addition & 0 deletions routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Init() *gin.Engine {

// /api/v1
apiV1.POST("video/process", process.Video)
apiV1.POST("audio/process", process.Audio)
apiV1.GET("url/presigned", controllers.GetPresignedURL)

return r
Expand Down
63 changes: 51 additions & 12 deletions service/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
rmq "github.com/rabbitmq/amqp091-go"
)

func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
func ProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
log.Println("Running ZeStream as Consumer")
var forever chan struct{}

Expand All @@ -30,13 +30,13 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
utils.LogErr(err)

msgs, err := ch.Consume(
q.Name, // queue
"VideoProcessConsumer", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
q.Name, // queue
"ProcessConsumer", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)

if err != nil {
Expand All @@ -50,14 +50,27 @@ func VideoProcessConsumer(ch *rmq.Channel, q *rmq.Queue) {
guard <- 1

var video types.Video
videoErr := json.Unmarshal(d.Body, &video)
if videoErr != nil {
log.Println(videoErr)
continue
}

err := json.Unmarshal(d.Body, &video)
if video.Type == "mp4" {
go processVideo(&video, guard)
continue
}

var audio types.Audio
audioErr := json.Unmarshal(d.Body, &audio)
if err != nil {
log.Println(err)
log.Println(audioErr)
continue
}

go processVideo(&video, guard)
if audio.Type == "mp3" {
go processAudio(&audio, guard)
}
}
}()

Expand All @@ -81,7 +94,7 @@ func processVideo(video *types.Video, guard <-chan int) {
utils.LogErr(err)
}

generateDash(videoFileName, video.Watermark)
generateVideoDash(videoFileName, video.Watermark)

uploader := utils.GetUploader(constants.CloudContainerNames[constants.Dashes], video.ID)

Expand All @@ -95,3 +108,29 @@ func processVideo(video *types.Video, guard <-chan int) {

<-guard
}

func processAudio(audio *types.Audio, guard <-chan int) {
log.Println("Processing Audio: ", audio)

var fileName = audio.ID + "." + audio.Type

err := utils.Fetch(audio.Src, fileName)
if err != nil {
utils.LogErr(err)
return
}

generateAudioDash(fileName)

uploader := utils.GetUploader(constants.CloudContainerNames[constants.Dashes], audio.ID)

outputDir, err := utils.GetOutputFilePathName(fileName, "")
utils.LogErr(err)

utils.UploadToCloudStorage(uploader, outputDir)

err = os.RemoveAll(outputDir)
utils.LogErr(err)

<-guard
}
28 changes: 27 additions & 1 deletion service/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ffmpeg "github.com/u2takey/ffmpeg-go"
)

func generateDash(fileName string, watermark types.WaterMark) {
func generateVideoDash(fileName string, watermark types.WaterMark) {
targetFile, err := utils.GetDownloadFilePathName(fileName)
if err != nil {
log.Println(err)
Expand Down Expand Up @@ -45,6 +45,32 @@ func generateDash(fileName string, watermark types.WaterMark) {
utils.DeleteFile(targetFile)
}

func generateAudioDash(fileName string) {
targetFile, err := utils.GetDownloadFilePathName(fileName)
if err != nil {
log.Println(err)
}

var fileNameStripped = utils.RemoveExtensionFromFile(fileName)

outputPath, err := utils.GetOutputFilePathName(fileName, fileNameStripped)
if err != nil {
log.Println(err)
return
}

var wg sync.WaitGroup

wg.Add(len(constants.AudioFileTypeMap))

go generateAudioFiles(targetFile, outputPath, &wg)

wg.Wait()

deleteVideoFiles(outputPath)
utils.DeleteFile(targetFile)
}

func generateAudioFiles(targetFile string, outputPath string, wg *sync.WaitGroup) {
for fileType, filePrefix := range constants.AudioFileTypeMap {
var outputFile = outputPath + filePrefix
Expand Down
14 changes: 11 additions & 3 deletions types/video.go → types/media.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package types

type Media struct {
ID string `json:"id" binding:"required"`
Src string `json:"src" binding:"required,url"`
Type string `json:"type" binding:"required"`
}

type Video struct {
ID string `json:"id" binding:"required"`
Src string `json:"src" binding:"required,url"`
Type string `json:"type" binding:"required"`
Media
Watermark WaterMark `json:"watermark"`
}

type Audio struct {
Media
}

type WaterMark struct {
ID string `json:"id" binding:"required_if=Watermark 1"`
Src string `json:"src" binding:"required_if=Watermark 1"`
Expand Down
2 changes: 1 addition & 1 deletion utils/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Fetch downlods a file to downloads folder from the given url,
and names it as given fileName
*/
func Fetch(url string, fileName string) error {
log.Println("Downloading Video: ", url)
log.Println("Downloading: ", url)

newFileName, err := GetDownloadFilePathName(fileName)
if err != nil {
Expand Down

0 comments on commit 21d0e17

Please # to comment.