Skip to content

Fix memory leaks #553

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 32 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2c02f5b
clear dag error when batching
DvirDukhan Jan 4, 2021
525469a
cleanup after client disconnect
DvirDukhan Jan 4, 2021
da2032c
fixed invalid write in tflite error. moved explicitly to RedisModule_…
DvirDukhan Jan 4, 2021
b909c05
always set model run context in dag op before cleanup
DvirDukhan Jan 4, 2021
8df0ff2
tflite - always free input tensors copies after model run
DvirDukhan Jan 4, 2021
dc18f0e
removed auto memory
DvirDukhan Jan 5, 2021
d6f87e1
fixed v0 script decode
DvirDukhan Jan 5, 2021
15c4df2
fixed v1 script decode
DvirDukhan Jan 5, 2021
176dec0
free sds in module info
DvirDukhan Jan 5, 2021
a49c617
sds wip
DvirDukhan Jan 5, 2021
a3642f8
torch - always free input tensors
DvirDukhan Jan 5, 2021
8dcc2c5
better handling of disconnect clinet via redismodule_blockclient. rem…
DvirDukhan Jan 5, 2021
bbc897e
Merge branch 'master' into fix_memory_leaks
DvirDukhan Jan 6, 2021
e1eedbb
always set script run context in dag op before error handling
DvirDukhan Jan 6, 2021
7f491a1
tensorflow error clenaup
DvirDukhan Jan 9, 2021
6a3f8fb
changed sds to redis string
DvirDukhan Jan 9, 2021
747f171
removed unwated tensor shallow copy
DvirDukhan Jan 9, 2021
2ee3593
cosmetics
DvirDukhan Jan 9, 2021
28be479
fixed AOF char buffer
DvirDukhan Jan 12, 2021
5abc288
tflite release tensor
DvirDukhan Jan 12, 2021
6eeb6b4
torch release tensor
DvirDukhan Jan 12, 2021
d13b724
torch release tensor
DvirDukhan Jan 12, 2021
116d1f6
background worker refactor
DvirDukhan Jan 12, 2021
3e42324
run valgrind after each test phase
DvirDukhan Jan 12, 2021
1b97882
undo changes in tests.h
DvirDukhan Jan 12, 2021
310d61e
better code style
DvirDukhan Jan 12, 2021
43b19b0
tests fix
DvirDukhan Jan 12, 2021
236034d
make format
DvirDukhan Jan 12, 2021
a862408
fine tuned supressions
DvirDukhan Jan 12, 2021
efb4b3d
rely only on RLTest valgrind report
DvirDukhan Jan 12, 2021
43afeb6
Removed comments
DvirDukhan Jan 13, 2021
340435d
Merge branch 'master' into fix_memory_leaks
DvirDukhan Jan 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions opt/redis_valgrind.sup
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
obj:*/libtensorflow.so.*
}

{
ignore_unversioned_libs
Memcheck:Leak
...
obj:*/libtensorflow_framework.so.*
}

{
ignore_unversioned_libs
Memcheck:Leak
Expand Down Expand Up @@ -54,17 +61,3 @@
fun:RAI_LoadBackend
}

{
<tf-operator new>
Memcheck:Leak
...
fun:clone
}

{
<malloc>
Memcheck:Leak
fun:malloc
...
fun:clone
}
39 changes: 18 additions & 21 deletions src/DAG/dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ static void Dag_LoadInputsToModelRunCtx(RedisAI_RunInfo *rinfo, RAI_DagOp *curre

static void Dag_StoreOutputsFromModelRunCtx(RedisAI_RunInfo *rinfo, RAI_DagOp *currentOp) {

RAI_ContextReadLock(rinfo);
RAI_ContextWriteLock(rinfo);
const size_t noutputs = RAI_ModelRunCtxNumOutputs(currentOp->mctx);
for (size_t outputNumber = 0; outputNumber < noutputs; outputNumber++) {
RAI_Tensor *tensor = RAI_ModelRunCtxOutputTensor(currentOp->mctx, outputNumber);
Expand Down Expand Up @@ -177,6 +177,9 @@ void RedisAI_BatchedDagRunSession_ModelRun_Step(RedisAI_RunInfo **batched_rinfo,
if (rinfo->single_op_dag == 0)
Dag_StoreOutputsFromModelRunCtx(rinfo, currentOp);
}
// Clear the result in case of an error.
if (result == REDISMODULE_ERR)
RAI_ClearError(&err);
}

/**
Expand Down Expand Up @@ -346,16 +349,20 @@ int RAI_DagOpBatchable(RAI_DagOp *op1, RedisAI_RunInfo *rinfo1, RAI_DagOp *op2,
return 1;
}

int RedisAI_DagDeviceComplete(RedisAI_RunInfo *rinfo) {
bool RedisAI_DagDeviceComplete(RedisAI_RunInfo *rinfo) {
return rinfo->dagDeviceCompleteOpCount == rinfo->dagDeviceOpCount;
}

int RedisAI_DagComplete(RedisAI_RunInfo *rinfo) {
bool RedisAI_DagComplete(RedisAI_RunInfo *rinfo) {
int completeOpCount = __atomic_load_n(rinfo->dagCompleteOpCount, __ATOMIC_RELAXED);

return completeOpCount == rinfo->dagOpCount;
}

bool RedisAI_DagError(RedisAI_RunInfo *rinfo) {
return __atomic_load_n(rinfo->dagError, __ATOMIC_RELAXED) != 0;
}

RAI_DagOp *RedisAI_DagCurrentOp(RedisAI_RunInfo *rinfo) {
if (rinfo->dagDeviceCompleteOpCount == rinfo->dagDeviceOpCount) {
return NULL;
Expand All @@ -364,21 +371,21 @@ RAI_DagOp *RedisAI_DagCurrentOp(RedisAI_RunInfo *rinfo) {
return rinfo->dagDeviceOps[rinfo->dagDeviceCompleteOpCount];
}

void RedisAI_DagCurrentOpInfo(RedisAI_RunInfo *rinfo, int *currentOpReady,
int *currentOpBatchable) {
void RedisAI_DagCurrentOpInfo(RedisAI_RunInfo *rinfo, bool *currentOpReady,
bool *currentOpBatchable) {
RAI_DagOp *currentOp_ = RedisAI_DagCurrentOp(rinfo);

*currentOpReady = 0;
*currentOpBatchable = 0;
*currentOpReady = false;
*currentOpBatchable = false;

if (currentOp_ == NULL) {
return;
}

if (currentOp_->mctx && currentOp_->mctx->model->opts.batchsize > 0) {
*currentOpBatchable = 1;
*currentOpBatchable = true;
}
*currentOpReady = 1;
*currentOpReady = true;
// If this is a single op dag, the op is definitely ready.
if (rinfo->single_op_dag == 1)
return;
Expand All @@ -389,7 +396,7 @@ void RedisAI_DagCurrentOpInfo(RedisAI_RunInfo *rinfo, int *currentOpReady,
for (int i = 0; i < n_inkeys; i++) {
if (AI_dictFind(rinfo->dagTensorsContext, currentOp_->inkeys[i]) == NULL) {
RAI_ContextUnlock(rinfo);
*currentOpReady = 0;
*currentOpReady = false;
return;
}
}
Expand Down Expand Up @@ -577,7 +584,6 @@ static void _ModelSingleOp_PersistTensors(RedisModuleCtx *ctx, RAI_DagOp *op) {
const size_t noutputs = RAI_ModelRunCtxNumOutputs(op->mctx);
for (size_t outputNumber = 0; outputNumber < noutputs; outputNumber++) {
RAI_Tensor *tensor = RAI_ModelRunCtxOutputTensor(op->mctx, outputNumber);
tensor = tensor ? RAI_TensorGetShallowCopy(tensor) : NULL;
if (tensor)
_StoreTensorInKeySpace(ctx, tensor, op->outkeys[outputNumber], false);
}
Expand All @@ -587,7 +593,6 @@ static void _ScriptSingleOp_PersistTensors(RedisModuleCtx *ctx, RAI_DagOp *op) {
const size_t noutputs = RAI_ScriptRunCtxNumOutputs(op->sctx);
for (size_t outputNumber = 0; outputNumber < noutputs; outputNumber++) {
RAI_Tensor *tensor = RAI_ScriptRunCtxOutputTensor(op->sctx, outputNumber);
tensor = tensor ? RAI_TensorGetShallowCopy(tensor) : NULL;
if (tensor)
_StoreTensorInKeySpace(ctx, tensor, op->outkeys[outputNumber], false);
}
Expand All @@ -600,7 +605,6 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc

if (RAI_GetErrorCode(rinfo->err) == RAI_EDAGRUN) {
RedisModule_ReplyWithError(ctx, RAI_GetErrorOneLine(rinfo->err));
RAI_FreeRunInfo(rinfo);
return REDISMODULE_ERR;
}
int dag_error = 0;
Expand All @@ -610,7 +614,6 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc

if (*rinfo->timedOut) {
RedisModule_ReplyWithSimpleString(ctx, "TIMEDOUT");
RAI_FreeRunInfo(rinfo);
return REDISMODULE_OK;
}

Expand Down Expand Up @@ -701,7 +704,6 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
if (rinfo->single_op_dag == 0) {
RedisModule_ReplySetArrayLength(ctx, rinfo->dagReplyLength);
}
RAI_FreeRunInfo(rinfo);
return REDISMODULE_ERR;
}

Expand All @@ -718,7 +720,6 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
}
}

RAI_FreeRunInfo(rinfo);
return REDISMODULE_OK;
}

Expand Down Expand Up @@ -746,11 +747,7 @@ int RedisAI_DagRun_IsKeysPositionRequest_ReportKeys(RedisModuleCtx *ctx, RedisMo
return REDISMODULE_OK;
}

void RunInfo_FreeData(RedisModuleCtx *ctx, void *rinfo) {}

void RedisAI_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
RedisModule_Log(ctx, "warning", "Blocked client %p disconnected!", (void *)bc);
}
void RunInfo_FreeData(RedisModuleCtx *ctx, void *rinfo) { RAI_FreeRunInfo(rinfo); }

// Add Shallow copies of the DAG run info to the devices' queues.
// Return REDISMODULE_OK in case of success, REDISMODULE_ERR if (at least) one insert op had
Expand Down
24 changes: 14 additions & 10 deletions src/DAG/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
* successfully. Since rinfo carries information on what queue
* it has been placed in, there's no need to pass the device identifier.
* @param rinfo context in which RedisAI blocking commands operate.
* @return nonzero if all ops are complete for device, 0 otherwise
* @return true if all ops are complete for device, 0 otherwise
*/
int RedisAI_DagDeviceComplete(RedisAI_RunInfo *rinfo);
bool RedisAI_DagDeviceComplete(RedisAI_RunInfo *rinfo);

/**
* Get whether all DAG ops have been executed successfully irrespective
* of the device, i.e. if the DAG has been completely executed.
* @param rinfo context in which RedisAI blocking commands operate.
* @return nonzero of all ops in DAG are complete, 0 otherwise
* @return true of all ops in DAG are complete, 0 otherwise
*/
int RedisAI_DagComplete(RedisAI_RunInfo *rinfo);
bool RedisAI_DagComplete(RedisAI_RunInfo *rinfo);

/**
* @brief Get an indication if an error happend during the dag run.
*
* @param rinfo context in which RedisAI blocking commands operate.
* @return true if there was an error
*/
bool RedisAI_DagError(RedisAI_RunInfo *rinfo);

/**
* Get current DAG op for the given device. An op is current if it's
Expand All @@ -50,7 +58,8 @@ RAI_DagOp *RedisAI_DagCurrentOp(RedisAI_RunInfo *rinfo);
* a MODELRUN and is BATCHSIZE greater than zero
* @return
*/
void RedisAI_DagCurrentOpInfo(RedisAI_RunInfo *rinfo, int *currentOpReady, int *currentOpBatchable);
void RedisAI_DagCurrentOpInfo(RedisAI_RunInfo *rinfo, bool *currentOpReady,
bool *currentOpBatchable);

/**
* Get batching information about a DAG op.
Expand Down Expand Up @@ -142,9 +151,4 @@ int DAG_InsertDAGToQueue(RedisAI_RunInfo *rinfo);
*/
void RunInfo_FreeData(RedisModuleCtx *ctx, void *rinfo);

/**
* @brief A callback to send to BlockClient.
*/
void RedisAI_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);

#endif /* SRC_DAG_H_ */
18 changes: 12 additions & 6 deletions src/backends.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ RedisModuleString *RAI_GetBackendsPath(RedisModuleCtx *ctx) {
RedisModuleString *module_path = RAI_GetModulePath(ctx);
backends_path = RedisModule_CreateStringPrintf(ctx, "%s/backends",
RedisModule_StringPtrLen(module_path, NULL));
RedisModule_FreeString(ctx, module_path);
}

return backends_path;
Expand Down Expand Up @@ -422,21 +423,26 @@ int RAI_LoadBackend(RedisModuleCtx *ctx, int backend, const char *path) {
RedisModuleString *backends_path = RAI_GetBackendsPath(ctx);
fullpath = RedisModule_CreateStringPrintf(
ctx, "%s/%s", RedisModule_StringPtrLen(backends_path, NULL), path);
RedisModule_FreeString(ctx, backends_path);
}

int ret;
switch (backend) {
case RAI_BACKEND_TENSORFLOW:
return RAI_LoadBackend_TensorFlow(ctx, RedisModule_StringPtrLen(fullpath, NULL));
ret = RAI_LoadBackend_TensorFlow(ctx, RedisModule_StringPtrLen(fullpath, NULL));
break;
case RAI_BACKEND_TFLITE:
return RAI_LoadBackend_TFLite(ctx, RedisModule_StringPtrLen(fullpath, NULL));
ret = RAI_LoadBackend_TFLite(ctx, RedisModule_StringPtrLen(fullpath, NULL));
break;
case RAI_BACKEND_TORCH:
return RAI_LoadBackend_Torch(ctx, RedisModule_StringPtrLen(fullpath, NULL));
ret = RAI_LoadBackend_Torch(ctx, RedisModule_StringPtrLen(fullpath, NULL));
break;
case RAI_BACKEND_ONNXRUNTIME:
return RAI_LoadBackend_ONNXRuntime(ctx, RedisModule_StringPtrLen(fullpath, NULL));
ret = RAI_LoadBackend_ONNXRuntime(ctx, RedisModule_StringPtrLen(fullpath, NULL));
break;
}

return REDISMODULE_ERR;
RedisModule_FreeString(ctx, fullpath);
return ret;
}

int RAI_LoadDefaultBackend(RedisModuleCtx *ctx, int backend) {
Expand Down
Loading