-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathRSCloudModeManager.m
119 lines (110 loc) · 6.64 KB
/
RSCloudModeManager.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
//
// RSCloudModeManager.m
// Rudder
//
// Created by Desu Sai Venkat on 09/08/22.
// Copyright © 2022 Rudder Labs India Pvt Ltd. All rights reserved.
//
#import "RSConfig.h"
#import "RSLogger.h"
#import "RSCloudModeManager.h"
#import "RSNetworkManager.h"
#import "RSNetworkResponse.h"
#import "RSMetricsReporter.h"
@implementation RSCloudModeManager
- (instancetype)initWithConfig:(RSConfig *) config andDBPersistentManager:(RSDBPersistentManager *) dbPersistentManager andNetworkManager:(RSNetworkManager *) networkManager andLock: (NSLock *) lock {
self = [super init];
if(self){
self->dbPersistentManager = dbPersistentManager;
self->networkManager = networkManager;
self->config = config;
self->lock = lock;
self->cloud_mode_processor_queue = dispatch_queue_create("com.rudder.RSCloudModeManager", NULL);
}
return self;
}
- (void) startCloudModeProcessor {
__weak RSCloudModeManager *weakSelf = self;
dispatch_async(cloud_mode_processor_queue, ^{
RSCloudModeManager *strongSelf = weakSelf;
[RSLogger logDebug:@"RSCloudModeManager: CloudModeProcessor: Starting the Cloud Mode Processor"];
int sleepCount = 0;
while (YES) {
[strongSelf->lock lock];
RSNetworkResponse* response = nil;
[strongSelf->dbPersistentManager clearOldEventsWithThreshold: strongSelf->config.dbCountThreshold];
[RSLogger logDebug:@"RSCloudModeManager: CloudModeProcessor: Fetching events to flush to server"];
RSDBMessage* dbMessage = [strongSelf->dbPersistentManager fetchEventsFromDB:(strongSelf->config.flushQueueSize) ForMode:CLOUDMODE];
if ((dbMessage.messages.count >= strongSelf->config.flushQueueSize) || (dbMessage.messages.count > 0 && (sleepCount >= strongSelf->config.sleepTimeout))) {
NSString* payload = [RSCloudModeManager getPayloadFromMessages:dbMessage];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Payload: %@", payload]];
[RSLogger logInfo:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: EventCount: %lu", (unsigned long)dbMessage.messageIds.count]];
[RSMetricsReporter report:CM_EVENT forMetricType:COUNT withProperties:@{TYPE: MESSAGES} andValue:(float)dbMessage.messages.count];
response = [strongSelf->networkManager sendNetworkRequest:payload toEndpoint:BATCH_ENDPOINT withRequestMethod:POST];
if (response.state == NETWORK_SUCCESS) {
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Updating status as CLOUDMODEPROCESSING DONE for events (%@)",[RSUtils getCSVString:dbMessage.messageIds]]];
[RSMetricsReporter report:CM_ATTEMPT_SUCCESS forMetricType:COUNT withProperties:nil andValue:(float)dbMessage.messages.count];
[strongSelf->dbPersistentManager updateEventsWithIds:dbMessage.messageIds withStatus:CLOUD_MODE_PROCESSING_DONE];
[strongSelf->dbPersistentManager clearProcessedEventsFromDB];
sleepCount = 0;
}
}
[strongSelf->lock unlock];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: cloudModeSleepCount: %d", sleepCount]];
sleepCount += 1;
if(response == nil) {
usleep(1000000);
} else if (response.state == WRONG_WRITE_KEY) {
[RSLogger logError:@"RSCloudModeManager: CloudModeProcessor: Wrong WriteKey. Aborting the Cloud Mode Processor"];
break;
} else if (response.state == INVALID_URL) {
[RSLogger logError:@"RSCloudModeManager: CloudModeProcessor: Invalid Data Plane URL. Aborting the Cloud Mode Processor"];
[RSMetricsReporter report:CM_ATTEMPT_ABORT forMetricType:COUNT withProperties:@{TYPE: DATA_PLANE_URL_INVALID} andValue:1];
break;
}
else if (response.state == NETWORK_ERROR) {
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: CloudModeProcessor: Retrying in: %d s", abs(sleepCount - strongSelf->config.sleepTimeout)]];
[RSMetricsReporter report:CM_ATTEMPT_RETRY forMetricType:COUNT withProperties:nil andValue:1];
usleep(abs(sleepCount - strongSelf->config.sleepTimeout) * 1000000);
}
}
});
}
+ (NSString*) getPayloadFromMessages: (RSDBMessage*)dbMessage{
NSMutableArray<NSString *>* messages = dbMessage.messages;
NSMutableArray<NSString *>* messageIds = dbMessage.messageIds;
NSMutableArray<NSString *> *batchMessageIds = [[NSMutableArray alloc] init];
NSString* sentAt = [RSUtils getTimestamp];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: getPayloadFromMessages: RecordCount: %lu", (unsigned long)messages.count]];
[RSLogger logDebug:[[NSString alloc] initWithFormat:@"RSCloudModeManager: getPayloadFromMessages: sentAtTimeStamp: %@", sentAt]];
NSMutableString* json = [[NSMutableString alloc] init];
[json appendString:@"{"];
[json appendFormat:@"\"sentAt\":\"%@\",", sentAt];
[json appendString:@"\"batch\":["];
unsigned int totalBatchSize = [RSUtils getUTF8Length:json] + 2; // we add 2 characters at the end
for (int index = 0; index < messages.count; index++) {
NSMutableString* message = [[NSMutableString alloc] initWithString:messages[index]];
long length = message.length;
message = [[NSMutableString alloc] initWithString:[message substringWithRange:NSMakeRange(0, (length-1))]];
[message appendFormat:@",\"sentAt\":\"%@\"},", sentAt];
// add message size to batch size
totalBatchSize += [RSUtils getUTF8Length:message];
// check totalBatchSize
if(totalBatchSize > MAX_BATCH_SIZE) {
[RSLogger logDebug:[NSString stringWithFormat:@"RSCloudModeManager: getPayloadFromMessages: MAX_BATCH_SIZE reached at index: %i | Total: %i",index, totalBatchSize]];
[RSMetricsReporter report:EVENTS_DISCARDED forMetricType:COUNT withProperties:@{TYPE: BATCH_SIZE_INVALID} andValue:1];
break;
}
[json appendString:message];
[batchMessageIds addObject:messageIds[index]];
}
if([json characterAtIndex:[json length]-1] == ',') {
// remove trailing ','
[json deleteCharactersInRange:NSMakeRange([json length]-1, 1)];
}
[json appendString:@"]}"];
// retain all events that are part of the current event
dbMessage.messageIds = batchMessageIds;
return [json copy];
}
@end