From 22b4af715501b5897a02cfc621712af478f767f0 Mon Sep 17 00:00:00 2001 From: ehco1996 Date: Mon, 26 Aug 2019 10:34:42 +0800 Subject: [PATCH] add sync traffic task --- cli/main.go | 3 +-- requests.go | 8 ++++++++ cronjob.go => tasks.go | 44 ++++++++++++++++++++++++++++++++---------- user.go | 20 +++++++++++++++---- 4 files changed, 59 insertions(+), 16 deletions(-) rename cronjob.go => tasks.go (64%) diff --git a/cli/main.go b/cli/main.go index 714f872..3a7a66f 100644 --- a/cli/main.go +++ b/cli/main.go @@ -13,7 +13,6 @@ import ( var SYNC_TIME = 60 func main() { - // TODO v2ray重启了,sidecar会挂 app := cli.NewApp() app.Name = "v2scar" @@ -49,7 +48,7 @@ func main() { up := v2scar.NewUserPool() tick := time.Tick(time.Duration(SYNC_TIME) * time.Second) for { - go v2scar.SyncJob(up) + go v2scar.SyncTask(up) <-tick } } diff --git a/requests.go b/requests.go index 4dc932d..2aecaa9 100644 --- a/requests.go +++ b/requests.go @@ -1,6 +1,7 @@ package v2scar import ( + "bytes" "encoding/json" "net/http" ) @@ -13,3 +14,10 @@ func getJson(c *http.Client, url string, target interface{}) error { defer r.Body.Close() return json.NewDecoder(r.Body).Decode(target) } + +func postJson(c *http.Client, url string, dataStruct interface{}) error { + buf := new(bytes.Buffer) + json.NewEncoder(buf).Encode(dataStruct) + _, err := http.Post(url, "application/json", buf) + return err +} diff --git a/cronjob.go b/tasks.go similarity index 64% rename from cronjob.go rename to tasks.go index bf4e257..12c9a60 100644 --- a/cronjob.go +++ b/tasks.go @@ -15,19 +15,29 @@ var API_ENDPOINT = "127.0.0.1:8080" var GRPC_ENDPOINT = "http://fun.com/api" type UserConfig struct { + UserId int `json:"user_id"` Email string `json:"email"` UUID string `json:"uuid"` AlterId uint32 `json:"alter_id"` Level uint32 `json:"level"` Enable bool `json:"enable"` } +type UserTraffic struct { + UserId int `json:"user_id"` + DownloadTraffic int64 `json:"dt"` + UploadTraffic int64 `json:"ut"` +} + +type syncReq struct { + UserTraffics []*UserTraffic `json:"user_traffics"` +} type syncResp struct { Configs []*UserConfig Tag string `json:"tag"` } -func SyncJob(up *UserPool) { +func SyncTask(up *UserPool) { // Connect to v2ray rpc ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -35,7 +45,8 @@ func SyncJob(up *UserPool) { conn, err := grpc.DialContext(ctx, GRPC_ENDPOINT, grpc.WithInsecure(), grpc.WithBlock()) defer conn.Close() if err != nil { - log.Fatalf("GRPC连接失败,请检查V2ray是否运行并开放对应grpc端口 当前GRPC地址: %v", GRPC_ENDPOINT) + log.Printf("[WARNING]: GRPC连接失败,请检查V2ray是否运行并开放对应grpc端口 当前GRPC地址: %v", GRPC_ENDPOINT) + return } // Init Client @@ -46,10 +57,16 @@ func SyncJob(up *UserPool) { resp := syncResp{} err = getJson(httpClient, API_ENDPOINT, &resp) if err != nil { - log.Fatalf("API连接失败,请检查API地址 当前地址: %v", API_ENDPOINT) + log.Printf("[WARNING]: API连接失败,请检查API地址 当前地址: %v", API_ENDPOINT) + return } + + // get user config initOrUpdateUser(up, proxymanClient, &resp) - syncUserTrafficToServer(up, statClient) + + // sync user traffic + syncUserTrafficToServer(up, statClient, httpClient) + } func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncResp) { @@ -58,7 +75,7 @@ func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncR user, _ := up.GetUserByEmail(cfg.Email) if user == nil { // New User - newUser, err := up.CreateUser(cfg.Email, cfg.UUID, cfg.Level, cfg.AlterId, cfg.Enable) + newUser, err := up.CreateUser(cfg.UserId, cfg.Email, cfg.UUID, cfg.Level, cfg.AlterId, cfg.Enable) if err != nil { log.Fatalln(err) } @@ -72,7 +89,7 @@ func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncR user.setEnable(cfg.Enable) } if user.Enable && !user.running { - // Start not Running user + // Start Not Running user AddInboundUser(c, sr.Tag, user) } if !user.Enable && user.running { @@ -83,15 +100,22 @@ func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncR } } -func syncUserTrafficToServer(up *UserPool, c v2stats.StatsServiceClient) { - // TODO sync - log.Println("[INFO] Call syncUserTrafficToServer") +func syncUserTrafficToServer(up *UserPool, c v2stats.StatsServiceClient, hc *http.Client) { GetAndResetUserTraffic(c, up) + + tfs := make([]*UserTraffic, 0, up.GetUsersNum()) for _, user := range up.GetAllUsers() { tf := user.DownloadTraffic + user.UploadTraffic if tf > 0 { log.Printf("[INFO] User: %v Now Used Total Traffic: %v", user.Email, tf) + tfs = append(tfs, &UserTraffic{ + UserId: user.UserId, + DownloadTraffic: user.DownloadTraffic, + UploadTraffic: user.UploadTraffic, + }) + user.resetTraffic() } } - + postJson(hc, API_ENDPOINT, &syncReq{UserTraffics: tfs}) + log.Printf("[INFO] Call syncUserTrafficToServer ONLINE USER COUNT: %d", len(tfs)) } diff --git a/user.go b/user.go index 23cda48..05ed177 100644 --- a/user.go +++ b/user.go @@ -9,6 +9,7 @@ import ( // User V2ray User type User struct { + UserId int `json:"user_id"` Email string `json:"email"` UUID string `json:"uuid"` AlterId uint32 `json:"alter_id"` @@ -19,8 +20,9 @@ type User struct { running bool } -func newUser(email, uuid string, level, alterId uint32, enable bool) *User { +func newUser(userId int, email, uuid string, level, alterId uint32, enable bool) *User { return &User{ + UserId: userId, Email: email, UUID: uuid, Level: level, @@ -37,6 +39,11 @@ func (u *User) setDownloadTraffic(dt int64) { atomic.StoreInt64(&u.DownloadTraffic, dt) } +func (u *User) resetTraffic() { + atomic.StoreInt64(&u.DownloadTraffic, 0) + atomic.StoreInt64(&u.UploadTraffic, 0) +} + func (u *User) setEnable(enable bool) { // NOTE not thread safe! u.Enable = enable @@ -62,14 +69,14 @@ func NewUserPool() *UserPool { } // CreateUser get create user -func (up *UserPool) CreateUser(email, uuid string, level, alterId uint32, enable bool) (*User, error) { +func (up *UserPool) CreateUser(userId int, email, uuid string, level, alterId uint32, enable bool) (*User, error) { up.access.Lock() defer up.access.Unlock() if user, found := up.users[email]; found { - return user, errors.New(fmt.Sprintf("User Already Exists Email: %s", email)) + return user, errors.New(fmt.Sprintf("UserId: %d Already Exists Email: %s", user.UserId, email)) } else { - user := newUser(email, uuid, level, alterId, enable) + user := newUser(userId, email, uuid, level, alterId, enable) up.users[user.Email] = user return user, nil } @@ -98,3 +105,8 @@ func (up *UserPool) GetAllUsers() []*User { } return users } + +// GetUsersNum GetUsersNum +func (up *UserPool) GetUsersNum() int { + return len(up.users) +}