-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy path.zflai_elasticsearch_store
103 lines (84 loc) · 3.65 KB
/
.zflai_elasticsearch_store
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Copyright (c) 2018 Sebastian Gniazdowski
#
# Sends logs from $3 to ElasticSearch database.
#
# $1 - target database
# $2 - target table
# $3 - name of array that holds entries to send
setopt localtraps
local -A field_type_map
field_type_map=(
"integer" "integer"
"long" "long"
"float" "float"
"double" "double"
"date" "date"
"boolean" "boolean"
"real" "double"
"datetime" "date"
"bool" "boolean"
)
[[ "$ZFLAI_LIBS_SOURCED" != "1" ]] && source "${ZFLAI_SRC_DIR}/zflai_lib.zsh"
local __db="$1" __table="$2" __array="$3[@]"
local __db_host="${DB_DEFS[${__db}_<access>_host]%/}" __db_index="${DB_DEFS[${__db}_<access>_index]}"
local __db_type="${DB_DEFS[${__db}_<access>_type]}"
local __it __it2 __vn __val __ts __text
local __log="${XDG_CACHE_HOME:-$HOME/.config}/zflai/zflai.log"
local __table_def_param_name
integer __have_table_def=0 __idx __coidx=1
local __table_resolved
.zflai_resolve "$__table" __table_resolved
__db_type="${__db_type//\%TABLE\%/$__table_resolved}"
local -a __splitted __keys
# Search for definition of the target table
.zflai_get_abstract_table_for "$__db" "$__table" "tspec_" __table_def_param_name __have_table_def && \
__keys=( "${(okn@)${(Pk@)__table_def_param_name}}" ) || \
{ .zflai_run_log "ElasticSearch-Storage: Error: No abstract definition of table \`${__table}'" \
"(general or for database \`${__db}'), aborting."; return 1; }
# Create index with mappings if it doesn't exist
if ! command curl --silent --show-error --fail --head -o /dev/null "$__db_host/$__db_index"; then
local item mapping='{
"mappings": {
"_doc": {
"properties": {
"write_moment": { "type": "double" },'$'\n';
for __it2 in "${__keys[@]}"; do
__vn="${__table_def_param_name}[$__it2]"
__val="${field_type_map[${(L)${(P)__vn}}]}"
[[ -z "$__val" ]] && {
[[ "${(P)__vn}" = (#i)varchar* ]] && __val="text"
[[ "${(P)__vn}" = (#i)char* ]] && __val="text"
[[ "${(P)__vn}" = (#i)string* ]] && __val="text"
}
item="\"${__it2##[0-9]##-}\": { \"type\": \"${__val:-${(P)__vn}}\" }"
mapping+="$item,"$'\n'
done
mapping="${mapping%,$'\n'}"$'\n'
mapping="$mapping""}}}}"
command curl -X PUT --silent --show-error -o /dev/null 2>>!"$__log" "$__db_host/$__db_index" -H 'Content-Type: application/json' -d"$mapping"
fi
# Skip timestamp from columns / hash __keys
local first_key="${__keys[1]}"
[[ "$first_key" = *time* ]] && shift __keys || __coidx=0
for __it in "${(P)__array}"; do
__splitted=( "${(@s:|:)__it}" )
__ts="${${(@s.::.)__splitted[1]}[-1]}"
shift __splitted
__splitted=( "${__splitted[@]//(#m)*/${${MATCH# }% }}" )
# Optional timestamp field, taken not from message text
# (which is currently hold in `__splitted' array), but from
# additional field within protocol (this is done to optimize,
# no additional read/writes & time querying on main-zsh side).
[[ "$first_key" = *time* ]] && __text="\"timestamp\": \"${__ts% }\","$'\n' || __text=""
# Epoch time with much greater precission, however read HERE, NOW,
# just to provide ordering field, so that logs can be well sorted
__text+="\"write_moment\": \"${EPOCHREALTIME:-0}\","$'\n'
for __it2 in "${__keys[@]}"; do
__idx="${(M)__it2##[0-9]##}"
__text+="\"${__it2##[0-9]##-}\": \"${__splitted[__idx-__coidx]}\","$'\n'
done
__text="${__text%,$'\n'}"$'\n'
command curl -X POST --silent --show-error -o /dev/null 2>>!"$__log" "$__db_host/$__db_index/_doc" -H 'Content-Type: application/json' \
-d$'{\n'"$__text"$'}\n'
done
# vim:ft=zsh:et