-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathss_queue.c
92 lines (80 loc) · 2.32 KB
/
ss_queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <pthread.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include "ss_queue.h"
#include "ss_lthread.h"
extern int vasprintf(char **, const char *, va_list);
int
queue_init(struct queue *queue, const char *fmt, ...)
{
va_list ap;
memset(queue, '\0', sizeof(*queue));
if (pthread_cond_init(&queue->cond, NULL) != 0)
return (-1);
if (pthread_mutex_init(&queue->mutex, NULL) != 0)
return (-1);
va_start(ap, fmt);
vasprintf(&queue->name, fmt, ap);
va_end(ap);
if (queue->name == NULL)
return (-1);
return (0);
}
void
queue_put_item(struct wi *wi, struct queue *queue)
{
struct wi *tmpwi, *nextwi, *prevwi;
double cutoff_time;
pthread_mutex_lock(&queue->mutex);
if (wi->wi_type == WI_INPACKET && queue->max_ttl > 0 && queue->length > 0
&& queue->length % 100 == 0) {
cutoff_time = INP(wi).dtime - queue->max_ttl;
prevwi = NULL;
for (tmpwi = queue->head; tmpwi != NULL; tmpwi = nextwi) {
nextwi = tmpwi->next;
if (INP(tmpwi).dtime < cutoff_time) {
if (queue->head == tmpwi)
queue->head = nextwi;
if (queue->tail == tmpwi)
queue->tail = prevwi;
queue->length -= 1;
wi_free(wi);
}
}
}
wi->next = NULL;
if (queue->head == NULL) {
queue->head = wi;
queue->tail = wi;
} else {
queue->tail->next = wi;
queue->tail = wi;
}
queue->length += 1;
if (queue->length > 99 && queue->length % 100 == 0)
fprintf(stderr, "queue(%s): length %d\n", queue->name, queue->length);
/* notify worker thread */
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock(&queue->mutex);
}
struct wi *
queue_get_item(struct queue *queue, int return_on_wake)
{
struct wi *wi;
pthread_mutex_lock(&queue->mutex);
while (queue->head == NULL) {
pthread_cond_wait(&queue->cond, &queue->mutex);
if (queue->head == NULL && return_on_wake != 0) {
pthread_mutex_unlock(&queue->mutex);
return NULL;
}
}
wi = queue->head;
queue->head = wi->next;
if (queue->head == NULL)
queue->tail = NULL;
queue->length -= 1;
pthread_mutex_unlock(&queue->mutex);
return wi;
}