Skip to content

Commit

Permalink
Y
Browse files Browse the repository at this point in the history
  • Loading branch information
Wikidepia committed Jan 27, 2025
1 parent eeba8b7 commit 7173ea9
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions handlers/scraper/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package handlers

import (
"errors"
"fmt"
"log/slog"
"net"
"sync"
"sync/atomic"
"time"

"github.com/kelindar/binary"
Expand All @@ -17,6 +17,7 @@ type remoteResult struct {
outChan chan error
}

var sessCount atomic.Int32
var inChan chan remoteResult

func init() {
Expand All @@ -26,7 +27,7 @@ func init() {
if err != nil {
return
}
fmt.Println("listening on", ln.Addr())
slog.Info("remote scraper is listening on", "address", ln.Addr())

go func() {
for {
Expand All @@ -41,12 +42,17 @@ func init() {
}

func handleConnection(conn net.Conn) {
session, err := smux.Server(conn, nil)
smuxConfig := smux.DefaultConfig()
smuxConfig.Version = 2

session, err := smux.Server(conn, smuxConfig)
if err != nil {
return
}
defer session.Close()
defer sessCount.Add(-1)

sessCount.Add(1)
var wg sync.WaitGroup
for {
stream, err := session.AcceptStream()
Expand All @@ -56,24 +62,20 @@ func handleConnection(conn net.Conn) {

wg.Add(1)
go func(stream *smux.Stream) {
defer wg.Done()
defer stream.Close()
defer func() {
stream.Close()
wg.Done()
}()

for rm := range inChan {
if err := stream.SetDeadline(time.Now().Add(timeout)); err != nil {
slog.Error("failed to set deadline", "err", err)
rm.outChan <- err
return
}

buf := []byte(rm.instaData.PostID)
if _, err = stream.Write(buf); err != nil {
slog.Error("failed to write to stream", "err", err)
rm.outChan <- err
return
}

outBuf := make([]byte, 1024*1024) // Smaller, more reasonable buffer
outBuf := make([]byte, 1024*1024)
n, err := stream.Read(outBuf)
if err != nil {
slog.Error("failed to read from stream", "err", err)
Expand All @@ -95,21 +97,25 @@ func handleConnection(conn net.Conn) {
}

func ScrapeRemote(i *InstaData) error {
if sessCount.Load() == 0 {
return errors.New("remote scraper is not running")
}

remoteRes := remoteResult{
instaData: i,
outChan: make(chan error),
}

select {
case inChan <- remoteRes:
case <-time.After(3 * time.Second):
case <-time.After(5 * time.Second):
return errors.New("remote scraper is not running")
}

select {
case err := <-remoteRes.outChan:
return err
case <-time.After(5 * time.Second):
return errors.New("ScrapeRemote timed out")
return errors.New("remote scraper is not running")
}
}

0 comments on commit 7173ea9

Please # to comment.