Skip to content

Commit

Permalink
fix(handler,sever): fixing wrong query and event handle, wrong metric…
Browse files Browse the repository at this point in the history
…s for subs. (#69)
  • Loading branch information
kehiy authored Oct 14, 2024
1 parent fd85392 commit 120bfe5
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 60 deletions.
15 changes: 10 additions & 5 deletions config/config.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# This is default immortal config file.
# This config file contains essential information which is needed for bootstrapping.
# The rest of configs such as limitations or NIP-11 profile must be changed on database config table,
# which is defined and documented on documents/config.md.
# The rest of configs such as limitations or NIP-11 profile must be changed on database config table.

# todo(@ZigBalthazar): move development notes to documents directory and define development docker files.
# developers note: for development, set environment to `"dev"` and make a config.yml and .env beside your build.

# environment determines where and how to read secrets. (dev/prod)
Expand All @@ -15,12 +13,15 @@ ws_server:
# bind is the IP address to be bind and listen on.
# default if local host.
bind: "0.0.0.0"

# port is websocket port to be listen on.
# default is 7777.
port: 9090
port: 7777

# known_bloom_size is the size of bloom filter to check and avoid trying to store/broadcast existing events.
# default is 1M events.
known_bloom_size: 1000000

# bloom_backup_path is the path to store bloom filter when relay shutdown.
# default is immo_bloom_backup.
bloom_backup_path: "immo_bloom_backup"
Expand All @@ -30,17 +31,21 @@ http_server:
# bind is the IP address to be bind and listen on.
# default if local host.
bind: "0.0.0.0"

# port is websocket port to be listen on.
# default is 8888.
port: 8080
port: 8888

# database contains details of database connections and limitations.
database:
# db_name is the name of mongodb related to immortal
# default is immortal.
db_name: "immortal"

# query_timeout_in_ms specifies the maximum duration (in milliseconds) for query execution before timing out.
# default is 3000.
query_timeout_in_ms: 3000

# connection_timeout_in_ms specifies the maximum duration (in milliseconds) that is used for creating connections to the server.
# default is 5000.
connection_timeout_in_ms: 5000
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ func TestLoadfromFile(t *testing.T) {
cfg, err := config.Load("./config.yml")
require.NoError(t, err, "error must be nil.")

assert.Equal(t, uint16(9090), cfg.WebsocketServer.Port)
assert.Equal(t, uint16(7777), cfg.WebsocketServer.Port)
assert.Equal(t, "0.0.0.0", cfg.WebsocketServer.Bind)
}
2 changes: 1 addition & 1 deletion config/parameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *Config) LoadParameters(db *database.Database) error {
AuthRequired: false, // Whether authentication is required for writes
PaymentRequired: false, // Whether payment is required to interact with the relay
RestrictedWrites: false, // Whether writes are restricted to authenticated or paying users
MaxEventTags: 200, // Maximum number of tags allowed in a single event
MaxEventTags: 1000, // Maximum number of tags allowed in a single event
MaxContentLength: 4096, // Maximum content length of an event (in bytes)
CreatedAtLowerLimit: 0, // Earliest timestamp allowed for event creation
CreatedAtUpperLimit: 0, // Latest timestamp allowed for event creation (0 for no limit)
Expand Down
7 changes: 1 addition & 6 deletions handler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@ package handler
import (
"context"
"errors"
"fmt"

"github.com/dezh-tech/immortal/types/event"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
)

func (h *Handler) HandleEvent(e *event.Event) error {
collName, ok := KindToCollectionName[e.Kind]
if !ok {
return fmt.Errorf("kind %d is not supported yet", e.Kind)
}
coll := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(e.Kind))

coll := h.db.Client.Database(h.db.DBName).Collection(collName)
ctx, cancel := context.WithTimeout(context.Background(), h.db.QueryTimeout)
defer cancel()

Expand Down
37 changes: 31 additions & 6 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var KindToCollectionName = map[types.Kind]string{
types.KindPatches: "patches",
types.KindIssues: "issues",
types.KindReplies: "replies",
types.KindStatus: "status_updates",
types.KindStatus: "status",
types.KindProblemTracker: "problem_trackers",
types.KindReporting: "reportings",
types.KindLabel: "labels",
Expand All @@ -49,10 +49,10 @@ var KindToCollectionName = map[types.Kind]string{
types.KindTorrentComment: "torrent_comments",
types.KindCoinJoinPool: "coin_join_pools",
types.KindCommunityPostApproval: "community_post_approvals",
types.KindJobRequest: "job_requests",
types.KindJobResult: "job_results",
types.KindJobFeedback: "job_feedbacks",
types.KindGroupControlEvents: "group_control_events",
types.KindJobRequest: "dvm",
types.KindJobResult: "dvm",
types.KindJobFeedback: "dvm",
types.KindGroups: "groups",
types.KindZapGoal: "zap_goals",
types.KindTidalLogin: "tidal_logins",
types.KindZapRequest: "zap_requests",
Expand Down Expand Up @@ -93,7 +93,32 @@ var KindToCollectionName = map[types.Kind]string{
types.KindShortFormPortraitVideoEvent: "short_form_portrait_video_events",
types.KindVideoViewEvent: "video_view_events",
types.KindCommunityDefinition: "community_definitions",
types.KindGroupMetadataEvents: "group_metadata_events",
types.KindGroupsMetadata: "groups_metadata",
}

func getCollectionName(k types.Kind) string {
collName, ok := KindToCollectionName[k]
if ok {
return collName
}

if k >= 9000 && k <= 9030 {
return "groups"
}

if k >= 1630 && k <= 1633 {
return "status"
}

if k >= 39000 && k <= 39009 {
return "groups_metadata"
}

if k >= 5000 && k <= 5999 || k >= 6000 && k <= 6999 || k == 7000 {
return "dvm"
}

return "unknown"
}

type Handler struct {
Expand Down
25 changes: 20 additions & 5 deletions handler/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

var possibleKinds = []types.Kind{
types.KindUserMetadata,
types.KindShortTextNote,
types.KindZap,
types.KindRelayListMetadata,
}

type filterQuery struct {
Tags map[string][]string

Expand Down Expand Up @@ -37,16 +44,24 @@ func (h *Handler) HandleReq(fs filter.Filters) ([]event.Event, error) {
Limit: f.Limit,
}

uniqueKinds := removeDuplicateKind(f.Kinds)
for _, k := range uniqueKinds {
queryKinds[k] = append(queryKinds[k], qf)
if len(f.Kinds) != 0 {
uniqueKinds := removeDuplicateKind(f.Kinds)
for _, k := range uniqueKinds {
queryKinds[k] = append(queryKinds[k], qf)
}
} else {
//! we query most requested kinds if there is no kind provided.
// FIX: any better way?
for _, k := range possibleKinds {
queryKinds[k] = append(queryKinds[k], qf)
}
}
}

var finalResult []event.Event

for kind, filters := range queryKinds {
collection := h.db.Client.Database(h.db.DBName).Collection(KindToCollectionName[kind])
collection := h.db.Client.Database(h.db.DBName).Collection(getCollectionName(kind))
for _, f := range filters {
query, opts, err := h.FilterToQuery(&f)
if err != nil {
Expand Down Expand Up @@ -84,7 +99,7 @@ func removeDuplicateKind(intSlice []types.Kind) []types.Kind {
}

func (h *Handler) FilterToQuery(fq *filterQuery) (bson.D, *options.FindOptions, error) {
var query bson.D
query := make(bson.D, 0)
opts := options.Find()

// Filter by IDs
Expand Down
5 changes: 0 additions & 5 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ devtools:
go install mvdan.cc/gofumpt@latest
go install github.com/volatiletech/sqlboiler/v4@latest
go install github.com/volatiletech/sqlboiler/v4/drivers/sqlboiler-psql@latest
# TODO ::: go-migrate

### Testing
unit-test:
Expand Down Expand Up @@ -42,8 +41,4 @@ compose-up:
compose-down:
docker-compose down

### sqlBoiler
models-generate:
sqlboiler psql

.PHONY: build
20 changes: 0 additions & 20 deletions server/http/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package http
import (
"context"
"net/http"
"runtime"
)

type status string
Expand All @@ -15,14 +14,6 @@ const (
statusTimeout status = "Timeout during health check"
)

type system struct {
Version string `json:"version"`
GoroutinesCount int `json:"goroutines_count"`
TotalAllocBytes uint64 `json:"total_alloc_bytes"`
HeapObjectsCount uint64 `json:"heap_objects_count"`
AllocBytes uint64 `json:"alloc_bytes"`
}

type service struct {
Name string `json:"name"`
Status bool `json:"status"`
Expand All @@ -32,22 +23,11 @@ type service struct {
type healthResponse struct {
Status status `json:"status"`
Database service `json:"databse"`
System system `json:"system"`
}

func (s *Server) healthHandler(w http.ResponseWriter, _ *http.Request) {
ms := runtime.MemStats{}
runtime.ReadMemStats(&ms)

resp := healthResponse{
Status: statusOK,
System: system{
Version: runtime.Version(),
GoroutinesCount: runtime.NumGoroutine(),
TotalAllocBytes: ms.Alloc,
HeapObjectsCount: ms.HeapObjects,
AllocBytes: ms.Alloc,
},
Database: service{
Name: "mongo_db",
},
Expand Down
2 changes: 1 addition & 1 deletion server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func New(cfg Config, db *database.Database) (*Server, error) {
}

r.HandleFunc("/health", s.healthHandler).Methods("GET")
r.Handle("/metrics", promhttp.Handler())
r.Handle("/metrics", promhttp.Handler()).Methods("GET")

return s, nil
}
Expand Down
12 changes: 9 additions & 3 deletions server/websocket/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (s *Server) readLoop(conn *websocket.Conn) {

s.metrics.Connections.Dec()

client, ok := s.conns[conn]
if ok {
s.metrics.Subscriptions.Sub(float64(len(client.subs)))
}

delete(s.conns, conn)

s.mu.Unlock()
Expand Down Expand Up @@ -160,7 +165,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) {
return
}

if len(msg.Filters) > s.config.Limitation.MaxFilters {
if len(msg.Filters) >= s.config.Limitation.MaxFilters {
_ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of filters is: %d",
s.config.Limitation.MaxFilters)))

Expand All @@ -169,7 +174,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) {
return
}

if s.config.Limitation.MaxSubidLength < len(msg.SubscriptionID) {
if s.config.Limitation.MaxSubidLength <= len(msg.SubscriptionID) {
_ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of sub id is: %d",
s.config.Limitation.MaxSubidLength)))

Expand All @@ -188,7 +193,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) {
return
}

if len(client.subs) > s.config.Limitation.MaxSubscriptions {
if len(client.subs) >= s.config.Limitation.MaxSubscriptions {
_ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: max limit of subs is: %d",
s.config.Limitation.MaxSubscriptions)))

Expand All @@ -199,6 +204,7 @@ func (s *Server) handleReq(conn *websocket.Conn, m message.Message) {

res, err := s.handlers.HandleReq(msg.Filters)
if err != nil {
log.Println(err.Error())
_ = conn.WriteMessage(1, message.MakeNotice(fmt.Sprintf("error: can't process REQ message: %s", err.Error())))
status = databaseFail

Expand Down
10 changes: 5 additions & 5 deletions types/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
KindPatches Kind = 1617
KindIssues Kind = 1621
KindReplies Kind = 1622
KindStatus Kind = 1630 // to 1633. support: todo.
KindStatus Kind = 1630
KindProblemTracker Kind = 1971
KindReporting Kind = 1984
KindLabel Kind = 1985
Expand All @@ -56,10 +56,10 @@ const (
KindTorrentComment Kind = 2004
KindCoinJoinPool Kind = 2022
KindCommunityPostApproval Kind = 4550
KindJobRequest Kind = 5000 // to 5999. support: todo.
KindJobResult Kind = 6000 // to 6999. support: todo.
KindJobRequest Kind = 5000
KindJobResult Kind = 6000
KindJobFeedback Kind = 7000
KindGroupControlEvents Kind = 9000 // to 9030. support: todo.
KindGroups Kind = 9000
KindZapGoal Kind = 9041
KindTidalLogin Kind = 9467
KindZapRequest Kind = 9734
Expand Down Expand Up @@ -127,7 +127,7 @@ const (
KindShortFormPortraitVideoEvent Kind = 34236
KindVideoViewEvent Kind = 34237
KindCommunityDefinition Kind = 34550
KindGroupMetadataEvents Kind = 39000 // to 39009. support: todo.
KindGroupsMetadata Kind = 39000
)

// IsRegular checks if the given kind is in Regular range.
Expand Down
4 changes: 2 additions & 2 deletions version.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import "fmt"
var (
major = 0
minor = 0
patch = 1
meta = ""
patch = 3
meta = "beta"
)

func StringVersion() string {
Expand Down

0 comments on commit 120bfe5

Please # to comment.