Skip to content

Commit

Permalink
add sync traffic task
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Aug 26, 2019
1 parent 0244335 commit 22b4af7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 16 deletions.
3 changes: 1 addition & 2 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
var SYNC_TIME = 60

func main() {
// TODO v2ray重启了,sidecar会挂

app := cli.NewApp()
app.Name = "v2scar"
Expand Down Expand Up @@ -49,7 +48,7 @@ func main() {
up := v2scar.NewUserPool()
tick := time.Tick(time.Duration(SYNC_TIME) * time.Second)
for {
go v2scar.SyncJob(up)
go v2scar.SyncTask(up)
<-tick
}
}
Expand Down
8 changes: 8 additions & 0 deletions requests.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v2scar

import (
"bytes"
"encoding/json"
"net/http"
)
Expand All @@ -13,3 +14,10 @@ func getJson(c *http.Client, url string, target interface{}) error {
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(target)
}

func postJson(c *http.Client, url string, dataStruct interface{}) error {
buf := new(bytes.Buffer)
json.NewEncoder(buf).Encode(dataStruct)
_, err := http.Post(url, "application/json", buf)
return err
}
44 changes: 34 additions & 10 deletions cronjob.go → tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,38 @@ var API_ENDPOINT = "127.0.0.1:8080"
var GRPC_ENDPOINT = "http://fun.com/api"

type UserConfig struct {
UserId int `json:"user_id"`
Email string `json:"email"`
UUID string `json:"uuid"`
AlterId uint32 `json:"alter_id"`
Level uint32 `json:"level"`
Enable bool `json:"enable"`
}
type UserTraffic struct {
UserId int `json:"user_id"`
DownloadTraffic int64 `json:"dt"`
UploadTraffic int64 `json:"ut"`
}

type syncReq struct {
UserTraffics []*UserTraffic `json:"user_traffics"`
}

type syncResp struct {
Configs []*UserConfig
Tag string `json:"tag"`
}

func SyncJob(up *UserPool) {
func SyncTask(up *UserPool) {

// Connect to v2ray rpc
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, GRPC_ENDPOINT, grpc.WithInsecure(), grpc.WithBlock())
defer conn.Close()
if err != nil {
log.Fatalf("GRPC连接失败,请检查V2ray是否运行并开放对应grpc端口 当前GRPC地址: %v", GRPC_ENDPOINT)
log.Printf("[WARNING]: GRPC连接失败,请检查V2ray是否运行并开放对应grpc端口 当前GRPC地址: %v", GRPC_ENDPOINT)
return
}

// Init Client
Expand All @@ -46,10 +57,16 @@ func SyncJob(up *UserPool) {
resp := syncResp{}
err = getJson(httpClient, API_ENDPOINT, &resp)
if err != nil {
log.Fatalf("API连接失败,请检查API地址 当前地址: %v", API_ENDPOINT)
log.Printf("[WARNING]: API连接失败,请检查API地址 当前地址: %v", API_ENDPOINT)
return
}

// get user config
initOrUpdateUser(up, proxymanClient, &resp)
syncUserTrafficToServer(up, statClient)

// sync user traffic
syncUserTrafficToServer(up, statClient, httpClient)

}

func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncResp) {
Expand All @@ -58,7 +75,7 @@ func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncR
user, _ := up.GetUserByEmail(cfg.Email)
if user == nil {
// New User
newUser, err := up.CreateUser(cfg.Email, cfg.UUID, cfg.Level, cfg.AlterId, cfg.Enable)
newUser, err := up.CreateUser(cfg.UserId, cfg.Email, cfg.UUID, cfg.Level, cfg.AlterId, cfg.Enable)
if err != nil {
log.Fatalln(err)
}
Expand All @@ -72,7 +89,7 @@ func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncR
user.setEnable(cfg.Enable)
}
if user.Enable && !user.running {
// Start not Running user
// Start Not Running user
AddInboundUser(c, sr.Tag, user)
}
if !user.Enable && user.running {
Expand All @@ -83,15 +100,22 @@ func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncR
}
}

func syncUserTrafficToServer(up *UserPool, c v2stats.StatsServiceClient) {
// TODO sync
log.Println("[INFO] Call syncUserTrafficToServer")
func syncUserTrafficToServer(up *UserPool, c v2stats.StatsServiceClient, hc *http.Client) {
GetAndResetUserTraffic(c, up)

tfs := make([]*UserTraffic, 0, up.GetUsersNum())
for _, user := range up.GetAllUsers() {
tf := user.DownloadTraffic + user.UploadTraffic
if tf > 0 {
log.Printf("[INFO] User: %v Now Used Total Traffic: %v", user.Email, tf)
tfs = append(tfs, &UserTraffic{
UserId: user.UserId,
DownloadTraffic: user.DownloadTraffic,
UploadTraffic: user.UploadTraffic,
})
user.resetTraffic()
}
}

postJson(hc, API_ENDPOINT, &syncReq{UserTraffics: tfs})
log.Printf("[INFO] Call syncUserTrafficToServer ONLINE USER COUNT: %d", len(tfs))
}
20 changes: 16 additions & 4 deletions user.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

// User V2ray User
type User struct {
UserId int `json:"user_id"`
Email string `json:"email"`
UUID string `json:"uuid"`
AlterId uint32 `json:"alter_id"`
Expand All @@ -19,8 +20,9 @@ type User struct {
running bool
}

func newUser(email, uuid string, level, alterId uint32, enable bool) *User {
func newUser(userId int, email, uuid string, level, alterId uint32, enable bool) *User {
return &User{
UserId: userId,
Email: email,
UUID: uuid,
Level: level,
Expand All @@ -37,6 +39,11 @@ func (u *User) setDownloadTraffic(dt int64) {
atomic.StoreInt64(&u.DownloadTraffic, dt)
}

func (u *User) resetTraffic() {
atomic.StoreInt64(&u.DownloadTraffic, 0)
atomic.StoreInt64(&u.UploadTraffic, 0)
}

func (u *User) setEnable(enable bool) {
// NOTE not thread safe!
u.Enable = enable
Expand All @@ -62,14 +69,14 @@ func NewUserPool() *UserPool {
}

// CreateUser get create user
func (up *UserPool) CreateUser(email, uuid string, level, alterId uint32, enable bool) (*User, error) {
func (up *UserPool) CreateUser(userId int, email, uuid string, level, alterId uint32, enable bool) (*User, error) {
up.access.Lock()
defer up.access.Unlock()

if user, found := up.users[email]; found {
return user, errors.New(fmt.Sprintf("User Already Exists Email: %s", email))
return user, errors.New(fmt.Sprintf("UserId: %d Already Exists Email: %s", user.UserId, email))
} else {
user := newUser(email, uuid, level, alterId, enable)
user := newUser(userId, email, uuid, level, alterId, enable)
up.users[user.Email] = user
return user, nil
}
Expand Down Expand Up @@ -98,3 +105,8 @@ func (up *UserPool) GetAllUsers() []*User {
}
return users
}

// GetUsersNum GetUsersNum
func (up *UserPool) GetUsersNum() int {
return len(up.users)
}

0 comments on commit 22b4af7

Please # to comment.