diff --git a/docs/commands.md b/docs/commands.md index f698342ed..3e44edfc8 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -934,14 +934,14 @@ Because `AI.DAGRUN` provides the `PERSIST` option it is flagged as a 'write' com Refer to the Redis [`READONLY` command](https://redis.io/commands/readonly) for further information about read-only cluster replicas. ## AI.INFO -The **`AI.INFO`** command returns general module information or information about the execution a model or a script. +The **`AI.INFO`** command returns information about the execution of a model or a script. -Runtime information is collected each time that [`AI.MODELRUN`](#aimodelrun) or [`AI.SCRIPTRUN`](#aiscriptrun) is called. The information is stored locally by the executing RedisAI engine, so when deployed in a cluster each shard stores its own runtime information. +Runtime information is collected each time that [`AI.MODELEXECUTE`](#aimodelrun) or [`AI.SCRIPTEXECUTE`](#aiscriptrun) is called. The information is stored locally by the executing RedisAI engine, so when deployed in a cluster each shard stores its own runtime information. **Redis API** ``` -AI.INFO [] [RESETSTAT] +AI.INFO [RESETSTAT] ``` _Arguments_ @@ -951,15 +951,7 @@ _Arguments_ _Return_ -For a module genernal information: An array with alternating entries that represent the following key-value pairs: - -* **Version**: a string showing the current module version. -* **Low level API Version**: a string showing the current module's low level api version. -* **RDB Encoding version**: a string showing the current module's RDB encoding version. -* **TensorFlow version**: a string showing the current loaded TesnorFlow backend version. -* **ONNX version**: a string showing the current loaded ONNX Runtime backend version. - -For model or script runtime information: An array with alternating entries that represent the following key-value pairs: +An array with alternating entries that represent the following key-value pairs: * **KEY**: a String of the name of the key storing the model or script value * **TYPE**: a String of the type of value (i.e. 'MODEL' or 'SCRIPT') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d3a85b471..ddcb1340d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -39,6 +39,7 @@ ADD_LIBRARY(redisai_obj OBJECT execution/parsing/parse_utils.c execution/run_info.c execution/background_workers.c + execution/run_queue_info.c execution/utils.c config/config.c execution/DAG/dag.c @@ -88,6 +89,7 @@ ENDIF() IF(BUILD_ORT) ADD_LIBRARY(redisai_onnxruntime_obj OBJECT backends/onnxruntime.c + backends/onnx_timeout.c ${BACKEND_COMMON_SRC} ) ENDIF() diff --git a/src/backends/backedns_api.h b/src/backends/backedns_api.h new file mode 100644 index 000000000..875fed7d5 --- /dev/null +++ b/src/backends/backedns_api.h @@ -0,0 +1,30 @@ +#pragma once + +#include + +/** + * @return The internal id of RedisAI current working thread. + * id range is {0, ..., -1}. If this is called from a non + * RedisAI BG thread, return -1. + */ +long (*RedisAI_GetThreadId)(void); + +/** + * @return The number of working threads in RedisAI. This number should be + * equal to the number of threads per queue (load time config) * number of devices + * registered in RedisAI (a new device is registered if a model is set to run on + * this device in AI.MODELSTORE command. + */ +uintptr_t (*RedisAI_GetThreadsCount)(void); + +/** + * @return The number of working threads per device queue (load time config). + */ +long long (*RedisAI_GetNumThreadsPerQueue)(void); + +/** + * @return The maximal number of milliseconds that a model run session should run + * before it is terminated forcefully (load time config). + * Currently supported only fo onnxruntime backend. + */ +long long (*RedisAI_GetModelExecutionTimeout)(void); diff --git a/src/backends/backends.c b/src/backends/backends.c index 1a82abe72..490922462 100644 --- a/src/backends/backends.c +++ b/src/backends/backends.c @@ -14,8 +14,45 @@ #include #include #include - #include "redismodule.h" +#include "config/config.h" +#include "execution/background_workers.h" + +static bool _ValidateFuncExists(RedisModuleCtx *ctx, void *func_ptr, const char *func_name, + const char *backend_name, const char *path) { + if (func_ptr == NULL) { + RedisModule_Log(ctx, "warning", + "Backend does not export %s. %s backend" + " was not loaded from %s", + func_name, backend_name, path); + return false; + } + return true; +} + +/** + * @brief Export a function from RedisAI to a backend. This will set a pointer + * to a function that has been declared in the backend to use the corresponding + * function in RedisAI. + * @param func_name A string that identifies the function to export. + * @param targetFuncPtr place holder for a function pointer coming from the + * backend to set the corresponding function from RedisAI into it. + */ +int RAI_ExportFunc(const char *func_name, void **targetFuncPtr) { + + if (strcmp("GetThreadId", func_name) == 0) { + *targetFuncPtr = BGWorker_GetThreadId; + } else if (strcmp("GetNumThreadsPerQueue", func_name) == 0) { + *targetFuncPtr = Config_GetNumThreadsPerQueue; + } else if (strcmp("GetModelExecutionTimeout", func_name) == 0) { + *targetFuncPtr = Config_GetModelExecutionTimeout; + } else if (strcmp("GetThreadsCount", func_name) == 0) { + *targetFuncPtr = BGWorker_GetThreadsCount; + } else { + return RedisModule_GetApi(func_name, targetFuncPtr); + } + return REDISMODULE_OK; +} RedisModuleString *RAI_GetModulePath(RedisModuleCtx *ctx) { Dl_info info; @@ -33,8 +70,9 @@ RedisModuleString *RAI_GetModulePath(RedisModuleCtx *ctx) { RedisModuleString *RAI_GetBackendsPath(RedisModuleCtx *ctx) { Dl_info info; RedisModuleString *backends_path = NULL; - if (RAI_BackendsPath != NULL) { - backends_path = RedisModule_CreateString(ctx, RAI_BackendsPath, strlen(RAI_BackendsPath)); + if (Config_GetBackendsPath() != NULL) { + backends_path = RedisModule_CreateString(ctx, Config_GetBackendsPath(), + strlen(Config_GetBackendsPath())); } else { RedisModuleString *module_path = RAI_GetModulePath(ctx); backends_path = RedisModule_CreateStringPrintf(ctx, "%s/backends", @@ -45,7 +83,7 @@ RedisModuleString *RAI_GetBackendsPath(RedisModuleCtx *ctx) { return backends_path; } -const char *RAI_BackendName(int backend) { +const char *RAI_GetBackendName(RAI_Backend backend) { switch (backend) { case RAI_BACKEND_TENSORFLOW: return "TF"; @@ -66,88 +104,62 @@ int RAI_LoadBackend_TensorFlow(RedisModuleCtx *ctx, const char *path) { } void *handle = dlopen(path, RTLD_NOW | RTLD_LOCAL); - if (handle == NULL) { RedisModule_Log(ctx, "warning", "Could not load TF backend from %s: %s", path, dlerror()); return REDISMODULE_ERR; } - - RAI_LoadedBackend backend = {0}; + RAI_LoadedBackend backend = {0}; // Initialize all the callbacks to NULL. int (*init_backend)(int (*)(const char *, void *)); init_backend = (int (*)(int (*)(const char *, void *)))(unsigned long)dlsym(handle, "RAI_InitBackendTF"); - if (init_backend == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_InitBackendTF. TF backend not " - "loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, init_backend, "RAI_InitBackendTF", "TF", path)) { + goto error; } + // Here we use the input callback to export functions from Redis to the backend, + // by setting the backend's function pointers to the corresponding functions in Redis. init_backend(RedisModule_GetApi); backend.model_create_with_nodes = (RAI_Model * (*)(RAI_Backend, const char *, RAI_ModelOpts, size_t, const char **, size_t, const char **, const char *, size_t, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelCreateTF"); - if (backend.model_create_with_nodes == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelCreateTF. TF backend not " - "loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_create_with_nodes, "RAI_ModelCreateTF", "TF", + path)) { + goto error; } backend.model_free = (void (*)(RAI_Model *, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelFreeTF"); - if (backend.model_free == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelFreeTF. TF backend not " - "loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_free, "RAI_ModelFreeTF", "TF", path)) { + goto error; } backend.model_run = (int (*)(RAI_Model * model, RAI_ExecutionCtx * *ectxs, RAI_Error * error))( unsigned long)dlsym(handle, "RAI_ModelRunTF"); - if (backend.model_run == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelRunTF. TF backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_run, "RAI_ModelRunTF", "TF", path)) { + goto error; } backend.model_serialize = (int (*)(RAI_Model *, char **, size_t *, RAI_Error *))( (unsigned long)dlsym(handle, "RAI_ModelSerializeTF")); - if (backend.model_serialize == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelSerializeTF. TF backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_serialize, "RAI_ModelSerializeTF", "TF", path)) { + goto error; } backend.get_version = (const char *(*)(void))(unsigned long)dlsym(handle, "RAI_GetBackendVersionTF"); - if (backend.get_version == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_GetBackendVersionTF. TF backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.get_version, "RAI_GetBackendVersionTF", "TF", path)) { + goto error; } RAI_backends.tf = backend; - RedisModule_Log(ctx, "notice", "TF backend loaded from %s", path); - return REDISMODULE_OK; + +error: + dlclose(handle); + return REDISMODULE_ERR; } int RAI_LoadBackend_TFLite(RedisModuleCtx *ctx, const char *path) { @@ -163,83 +175,58 @@ int RAI_LoadBackend_TFLite(RedisModuleCtx *ctx, const char *path) { dlerror()); return REDISMODULE_ERR; } - - RAI_LoadedBackend backend = {0}; + RAI_LoadedBackend backend = {0}; // Initialize all the callbacks to NULL. int (*init_backend)(int (*)(const char *, void *)); init_backend = (int (*)(int (*)(const char *, void *)))(unsigned long)dlsym( handle, "RAI_InitBackendTFLite"); - if (init_backend == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_InitBackendTFLite. TFLITE " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, init_backend, "RAI_InitBackendTFLite", "TFLite", path)) { + goto error; } + // Here we use the input callback to export functions from Redis to the backend, + // by setting the backend's function pointers to the corresponding functions in Redis. init_backend(RedisModule_GetApi); backend.model_create = (RAI_Model * (*)(RAI_Backend, const char *, RAI_ModelOpts, const char *, size_t, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelCreateTFLite"); - if (backend.model_create == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelCreateTFLite. TFLITE " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_create, "RAI_ModelCreateTFLite", "TFLite", path)) { + goto error; } backend.model_free = (void (*)(RAI_Model *, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelFreeTFLite"); - if (backend.model_free == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelFreeTFLite. TFLITE " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_free, "RAI_ModelFreeTFLite", "TFLite", path)) { + goto error; } backend.model_run = (int (*)(RAI_Model * model, RAI_ExecutionCtx * *ectxs, RAI_Error * error))( unsigned long)dlsym(handle, "RAI_ModelRunTFLite"); - if (backend.model_run == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelRunTFLite. TFLITE " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_run, "RAI_ModelRunTFLite", "TFLite", path)) { + goto error; } backend.model_serialize = (int (*)(RAI_Model *, char **, size_t *, RAI_Error *))( unsigned long)dlsym(handle, "RAI_ModelSerializeTFLite"); - if (backend.model_serialize == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelSerializeTFLite. TFLITE " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_serialize, "RAI_ModelSerializeTFLite", "TFLite", + path)) { + goto error; } backend.get_version = (const char *(*)(void))(unsigned long)dlsym(handle, "RAI_GetBackendVersionTFLite"); - if (backend.get_version == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_GetBackendVersionTFLite. TFLite backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.get_version, "RAI_GetBackendVersionTFLite", "TFLite", + path)) { + goto error; } RAI_backends.tflite = backend; - RedisModule_Log(ctx, "notice", "TFLITE backend loaded from %s", path); - return REDISMODULE_OK; + +error: + dlclose(handle); + return REDISMODULE_ERR; } int RAI_LoadBackend_Torch(RedisModuleCtx *ctx, const char *path) { @@ -249,122 +236,82 @@ int RAI_LoadBackend_Torch(RedisModuleCtx *ctx, const char *path) { } void *handle = dlopen(path, RTLD_NOW | RTLD_LOCAL); - if (handle == NULL) { RedisModule_Log(ctx, "warning", "Could not load TORCH backend from %s: %s", path, dlerror()); return REDISMODULE_ERR; } - RAI_LoadedBackend backend = {0}; + RAI_LoadedBackend backend = {0}; // Initialize all the callbacks to NULL. int (*init_backend)(int (*)(const char *, void *)); init_backend = (int (*)(int (*)(const char *, void *)))(unsigned long)dlsym( handle, "RAI_InitBackendTorch"); - if (init_backend == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_InitBackendTorch. TORCH " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, init_backend, "RAI_InitBackendTorch", "TORCH", path)) { + goto error; } + // Here we use the input callback to export functions from Redis to the backend, + // by setting the backend's function pointers to the corresponding functions in Redis. init_backend(RedisModule_GetApi); backend.model_create = (RAI_Model * (*)(RAI_Backend, const char *, RAI_ModelOpts, const char *, size_t, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelCreateTorch"); - if (backend.model_create == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelCreateTorch. TORCH " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_create, "RAI_ModelCreateTorch", "TORCH", path)) { + goto error; } backend.model_free = (void (*)(RAI_Model *, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelFreeTorch"); - if (backend.model_free == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelFreeTorch. TORCH backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_free, "RAI_ModelFreeTorch", "TORCH", path)) { + goto error; } backend.model_run = (int (*)(RAI_Model * model, RAI_ExecutionCtx * *ectxs, RAI_Error * error))( unsigned long)dlsym(handle, "RAI_ModelRunTorch"); - if (backend.model_run == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelRunTorch. TORCH backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_run, "RAI_ModelRunTorch", "TORCH", path)) { + goto error; } backend.model_serialize = (int (*)(RAI_Model *, char **, size_t *, RAI_Error *))( unsigned long)dlsym(handle, "RAI_ModelSerializeTorch"); - if (backend.model_serialize == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelSerializeTorch. TORCH " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_serialize, "RAI_ModelSerializeTorch", "TORCH", + path)) { + goto error; } backend.script_create = (RAI_Script * (*)(const char *, const char *, RAI_Error *))( unsigned long)dlsym(handle, "RAI_ScriptCreateTorch"); - if (backend.script_create == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ScriptCreateTorch. TORCH " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.script_create, "RAI_ScriptCreateTorch", "TORCH", path)) { + goto error; } backend.script_free = (void (*)(RAI_Script *, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ScriptFreeTorch"); - if (backend.script_free == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ScriptFreeTorch. TORCH " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.script_free, "RAI_ScriptFreeTorch", "TORCH", path)) { + goto error; } backend.script_run = (int (*)(RAI_Script *, const char *, RAI_ExecutionCtx *, RAI_Error *))( unsigned long)dlsym(handle, "RAI_ScriptRunTorch"); - if (backend.script_run == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ScriptRunTorch. TORCH backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.script_run, "RAI_ScriptRunTorch", "TORCH", path)) { + goto error; } backend.get_version = (const char *(*)(void))(unsigned long)dlsym(handle, "RAI_GetBackendVersionTorch"); - if (backend.get_version == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_GetBackendVersionTorch. TORCH backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.get_version, "RAI_GetBackendVersionTorch", "TORCH", + path)) { + goto error; } RAI_backends.torch = backend; - RedisModule_Log(ctx, "notice", "TORCH backend loaded from %s", path); - return REDISMODULE_OK; + +error: + dlclose(handle); + return REDISMODULE_ERR; } int RAI_LoadBackend_ONNXRuntime(RedisModuleCtx *ctx, const char *path) { @@ -379,100 +326,93 @@ int RAI_LoadBackend_ONNXRuntime(RedisModuleCtx *ctx, const char *path) { RedisModule_Log(ctx, "warning", "Could not load ONNX backend from %s: %s", path, dlerror()); return REDISMODULE_ERR; } - RAI_LoadedBackend backend = {0}; - int (*init_backend)(int (*)(const char *, void *)); + int (*init_backend)(int (*)(const char *, void **)); init_backend = - (int (*)(int (*)(const char *, void *)))(unsigned long)dlsym(handle, "RAI_InitBackendORT"); - if (init_backend == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_InitBackendORT. ONNX backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + (int (*)(int (*)(const char *, void **)))(unsigned long)dlsym(handle, "RAI_InitBackendORT"); + if (!_ValidateFuncExists(ctx, init_backend, "RAI_InitBackendORT", "ONNX", path)) { + goto error; } - init_backend(RedisModule_GetApi); + // Here we use the input callback to export functions from Redis and RedisAI + // to the backend, by setting the backend's function pointers to the + // corresponding functions in Redis/RedisAI. + init_backend(RAI_ExportFunc); backend.model_create = (RAI_Model * (*)(RAI_Backend, const char *, RAI_ModelOpts, const char *, size_t, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelCreateORT"); - if (backend.model_create == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelCreateORT. ONNX backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_create, "RAI_ModelCreateORT", "ONNX", path)) { + goto error; } backend.model_free = (void (*)(RAI_Model *, RAI_Error *))(unsigned long)dlsym(handle, "RAI_ModelFreeORT"); - if (backend.model_free == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelFreeORT. ONNX backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_free, "RAI_ModelFreeORT", "ONNX", path)) { + goto error; } backend.model_run = (int (*)(RAI_Model * model, RAI_ExecutionCtx * *ectxs, RAI_Error * error))( unsigned long)dlsym(handle, "RAI_ModelRunORT"); - if (backend.model_run == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelRunORT. ONNX backend not " - "loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_run, "RAI_ModelRunORT", "ONNX", path)) { + goto error; } backend.model_serialize = (int (*)(RAI_Model *, char **, size_t *, RAI_Error *))( unsigned long)dlsym(handle, "RAI_ModelSerializeORT"); - if (backend.model_serialize == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_ModelSerializeORT. ONNX " - "backend not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.model_serialize, "RAI_ModelSerializeORT", "ONNX", path)) { + goto error; } backend.get_version = (const char *(*)(void))(unsigned long)dlsym(handle, "RAI_GetBackendVersionORT"); - if (backend.get_version == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_GetBackendVersionORT. ONNX backend " - "not loaded from %s", - path); - return REDISMODULE_ERR; + if (!_ValidateFuncExists(ctx, backend.get_version, "RAI_GetBackendVersionORT", "ONNX", path)) { + goto error; } backend.get_memory_info = (unsigned long long (*)(void))(unsigned long)dlsym(handle, "RAI_GetMemoryInfoORT"); - if (backend.get_memory_info == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_GetMemoryInfoORT. ONNX backend " - "not loaded from %s", - path); + if (!_ValidateFuncExists(ctx, backend.get_memory_info, "RAI_GetMemoryInfoORT", "ONNX", path)) { + goto error; } + backend.get_memory_access_num = (unsigned long long (*)(void))(unsigned long)dlsym(handle, "RAI_GetMemoryAccessORT"); - if (backend.get_memory_access_num == NULL) { - dlclose(handle); - RedisModule_Log(ctx, "warning", - "Backend does not export RAI_GetMemoryAccessORT. ONNX backend " - "not loaded from %s", - path); + if (!_ValidateFuncExists(ctx, backend.get_memory_access_num, "RAI_GetMemoryAccessORT", "ONNX", + path)) { + goto error; + } + + backend.stop_long_running_sessions_cb = + (void (*)(RedisModuleCtx *, RedisModuleEvent, uint64_t, void *))(unsigned long)dlsym( + handle, "RAI_EnforceTimeoutORT"); + if (!_ValidateFuncExists(ctx, backend.stop_long_running_sessions_cb, "RAI_EnforceTimeoutORT", + "ONNX", path)) { + goto error; } + backend.add_new_device_cb = + (int (*)(const char *))(unsigned long)dlsym(handle, "RAI_AddNewDeviceORT"); + if (!_ValidateFuncExists(ctx, backend.add_new_device_cb, "RAI_AddNewDeviceORT", "ONNX", path)) { + goto error; + } + + backend.get_max_run_sessions = + (size_t(*)(void))(unsigned long)dlsym(handle, "RAI_GetGlobalRunSessionsLenORT"); + if (!_ValidateFuncExists(ctx, backend.get_max_run_sessions, "RAI_GetGlobalRunSessionsLenORT", + "ONNX", path)) { + goto error; + } + + RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_CronLoop, + backend.stop_long_running_sessions_cb); RAI_backends.onnx = backend; RedisModule_Log(ctx, "notice", "ONNX backend loaded from %s", path); return REDISMODULE_OK; + +error: + dlclose(handle); + return REDISMODULE_ERR; } int RAI_LoadBackend(RedisModuleCtx *ctx, int backend, const char *path) { diff --git a/src/backends/backends.h b/src/backends/backends.h index 0b470b822..0345f8c04 100644 --- a/src/backends/backends.h +++ b/src/backends/backends.h @@ -82,6 +82,15 @@ typedef struct RAI_LoadedBackend { // Returns the number of times that Redis accessed backend allocator. unsigned long long (*get_memory_access_num)(void); + + // A callback for to use whenever a new device is introduced. + int (*add_new_device_cb)(const char *); + + // Kill run session callback (for stopping long runs). + void (*stop_long_running_sessions_cb)(RedisModuleCtx *, RedisModuleEvent, uint64_t, void *); + + // Get the number of maximum run sessions that can run. + size_t (*get_max_run_sessions)(void); } RAI_LoadedBackend; typedef struct RAI_LoadedBackends { @@ -92,9 +101,12 @@ typedef struct RAI_LoadedBackends { } RAI_LoadedBackends; RAI_LoadedBackends RAI_backends; -char *RAI_BackendsPath; int RAI_LoadBackend(RedisModuleCtx *ctx, int backend, const char *path); + int RAI_LoadDefaultBackend(RedisModuleCtx *ctx, int backend); -const char *RAI_BackendName(int backend); +/** + * @brief Returns the backend name as string. + */ +const char *RAI_GetBackendName(RAI_Backend backend); diff --git a/src/backends/onnx_timeout.c b/src/backends/onnx_timeout.c new file mode 100644 index 000000000..6e3a715ad --- /dev/null +++ b/src/backends/onnx_timeout.c @@ -0,0 +1,117 @@ +#include "onnx_timeout.h" +#include "util/arr.h" +#include "execution/utils.h" +#include "config/config.h" +#include +#include "util/string_utils.h" +#include "redis_ai_objects/stats.h" +#include "backedns_api.h" + +int RAI_InitGlobalRunSessionsORT() { + onnx_global_run_sessions = RedisModule_Alloc(sizeof(OnnxGlobalRunSessions)); + + // Initialize the array with entries number equals to the number of currently + // working threads in RedisAI (note that CPU threads must exist form the start). + size_t RAI_working_threads_num = RedisAI_GetThreadsCount(); + OnnxRunSessionCtx **run_sessions_array = + array_new(OnnxRunSessionCtx *, RAI_working_threads_num); + for (size_t i = 0; i < RAI_working_threads_num; i++) { + OnnxRunSessionCtx *entry = RedisModule_Calloc(1, sizeof(OnnxRunSessionCtx)); + entry->runState = RedisModule_Calloc(1, sizeof(entry->runState)); + *entry->runState = RUN_SESSION_AVAILABLE; + run_sessions_array = array_append(run_sessions_array, entry); + } + onnx_global_run_sessions->OnnxRunSessions = run_sessions_array; + pthread_rwlock_init(&(onnx_global_run_sessions->rwlock), NULL); + + return REDISMODULE_OK; +} + +size_t RAI_GetGlobalRunSessionsLenORT() { + pthread_rwlock_rdlock(&(onnx_global_run_sessions->rwlock)); + size_t len = array_len(onnx_global_run_sessions->OnnxRunSessions); + pthread_rwlock_unlock(&(onnx_global_run_sessions->rwlock)); + return len; +} + +int RAI_AddNewDeviceORT(const char *device_str) { + + // Acquire write lock, as we might reallocate the array while extending it. + pthread_rwlock_wrlock(&(onnx_global_run_sessions->rwlock)); + OnnxRunSessionCtx **run_sessions_array = onnx_global_run_sessions->OnnxRunSessions; + + // Extend the array with an entry for every working thread on the new device, + // initialized to NULL. + size_t size = RedisAI_GetNumThreadsPerQueue(); + for (size_t i = 0; i < size; i++) { + OnnxRunSessionCtx *entry = RedisModule_Calloc(1, sizeof(OnnxRunSessionCtx)); + entry->runState = RedisModule_Calloc(1, sizeof(entry->runState)); + *entry->runState = RUN_SESSION_AVAILABLE; + run_sessions_array = array_append(run_sessions_array, entry); + } + onnx_global_run_sessions->OnnxRunSessions = run_sessions_array; + pthread_rwlock_unlock(&(onnx_global_run_sessions->rwlock)); + return REDISMODULE_OK; +} + +void RAI_EnforceTimeoutORT(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, + void *data) { + RedisModule_Assert(eid.id == REDISMODULE_EVENT_CRON_LOOP); + const OrtApi *ort = OrtGetApiBase()->GetApi(1); + pthread_rwlock_rdlock(&(onnx_global_run_sessions->rwlock)); + OnnxRunSessionCtx **run_sessions_ctx = onnx_global_run_sessions->OnnxRunSessions; + size_t len = array_len(run_sessions_ctx); + long long curr_time = mstime(); + long long timeout = RedisAI_GetModelExecutionTimeout(); + for (size_t i = 0; i < len; i++) { + // Check if a sessions is running for too long, and kill it if is still active. + if (curr_time - run_sessions_ctx[i]->queuingTime > timeout) { + if (__sync_bool_compare_and_swap(run_sessions_ctx[i]->runState, RUN_SESSION_ACTIVE, + RUN_SESSION_INVALID)) { + // Set termination flag, validate that ONNX API succeeded (returns NULL) + RedisModule_Assert(ort->RunOptionsSetTerminate(run_sessions_ctx[i]->runOptions) == + NULL); + __atomic_store_n(run_sessions_ctx[i]->runState, RUN_SESSION_TERMINATED, + __ATOMIC_RELAXED); + } + } + } + pthread_rwlock_unlock(&(onnx_global_run_sessions->rwlock)); +} + +void RAI_ActivateRunSessionCtxORT(OrtRunOptions *new_run_options, long *run_session_index) { + + pthread_rwlock_rdlock(&(onnx_global_run_sessions->rwlock)); + // Get the thread id (which is the correspondent index in the global sessions array + 1). + // if thread id is -1, we are not running from RedisAI thread (not allowed) + *run_session_index = RedisAI_GetThreadId(); + if (*run_session_index == -1) { + pthread_rwlock_unlock(&(onnx_global_run_sessions->rwlock)); + return; + } + OnnxRunSessionCtx *entry = onnx_global_run_sessions->OnnxRunSessions[*run_session_index]; + RedisModule_Assert(*entry->runState == RUN_SESSION_AVAILABLE); + + // Update the entry with the current session data. + entry->runOptions = new_run_options; + entry->queuingTime = mstime(); + __atomic_store_n(entry->runState, RUN_SESSION_ACTIVE, __ATOMIC_RELAXED); + pthread_rwlock_unlock(&(onnx_global_run_sessions->rwlock)); +} + +void RAI_ResetRunSessionCtxORT(long run_session_index) { + const OrtApi *ort = OrtGetApiBase()->GetApi(1); + pthread_rwlock_rdlock(&(onnx_global_run_sessions->rwlock)); + OnnxRunSessionCtx *entry = onnx_global_run_sessions->OnnxRunSessions[run_session_index]; + + // Busy wait until we get a valid state, as we might access this entry from + // the main thread callback and call ONNX API to terminate the run session. + RunSessionState state; + do { + state = __atomic_load_n(entry->runState, __ATOMIC_RELAXED); + } while (state != RUN_SESSION_ACTIVE && state != RUN_SESSION_TERMINATED); + + ort->ReleaseRunOptions(entry->runOptions); + __atomic_store_n(entry->runState, RUN_SESSION_AVAILABLE, __ATOMIC_RELAXED); + pthread_rwlock_unlock(&(onnx_global_run_sessions->rwlock)); +} diff --git a/src/backends/onnx_timeout.h b/src/backends/onnx_timeout.h new file mode 100644 index 000000000..63935fe83 --- /dev/null +++ b/src/backends/onnx_timeout.h @@ -0,0 +1,83 @@ +#pragma once + +#include "backends/onnxruntime.h" +#include "onnxruntime_c_api.h" + +/** + * The possible states for every run session entry in the array (entry per BG thread): + * Every is initialized as AVAILABLE, which means that it is ready to get a new run session. + * BG thread can perform a transition from AVAILABLE to ACTIVE upon starting a new run session. + * In the cron callback, Redis main thread can perform a transition from ACTIVE to + * INVALID if a timeout has reached, set the run session as terminated, and then make + * another transition to TERMINATED. + * At the end of a run session, the state is ACTIVE/TERMINATED, and then the BG thread + * reset the entry and make a transition back to AVAILABLE. + * Transition are done atomically to ensure right synchronization (BG thread cannot reset + * run session while main thread is setting it as terminated). + */ +typedef enum { + RUN_SESSION_AVAILABLE, + RUN_SESSION_ACTIVE, + RUN_SESSION_TERMINATED, + RUN_SESSION_INVALID +} RunSessionState; + +typedef struct OnnxRunSessionCtx { + long long queuingTime; + OrtRunOptions *runOptions; + RunSessionState *runState; +} OnnxRunSessionCtx; + +// This is a global array of OnnxRunSessionCtx. Contains an entry for every thread +// (on every device) that onnx models may run on. +typedef struct OnnxGlobalRunSessions { + OnnxRunSessionCtx **OnnxRunSessions; + pthread_rwlock_t rwlock; +} OnnxGlobalRunSessions; + +OnnxGlobalRunSessions *onnx_global_run_sessions; + +/** + * @brief This is called whenever Onnx backend is loaded. It creates the global + * OnnxGlobalRunSessions structure with entry-per-thread (for CPU threads at first), + * so that every thread will have a designated entry to update with the onnx session + * that it's going to run. + */ +int RAI_InitGlobalRunSessionsORT(void); + +/** + * @return The length of the global array (should be the number of current working threads) + */ +size_t RAI_GetGlobalRunSessionsLenORT(void); + +/** + * @brief This is called whenever RedisAI gets a request to store a model that run + * on a new device, and creates some more working thread, as configured in + * ThreadPerQueue. Thus, the global array of onnx sessions that has an + * entry-per-thread is extended accordingly. + */ +int RAI_AddNewDeviceORT(const char *device_str); + +/** + * @brief A callback that is registered to RedisCron event, that is, it is called + * periodically and go over all the (possibly running) onnx sessions, and kill + * those that exceeds the timeout. + */ +void RAI_EnforceTimeoutORT(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, + void *data); + +/** + * @brief Set a new OrtRunOptions in the global structure, to allow us to + * "terminate" the run session from the cron callback. + * @param new_run_options - The newly created OrtRunOptions to store. + * @param run_session_index - placeholder for the index of the running thread + * in the global array, to have a quick access later to clean this entry. + */ +void RAI_ActivateRunSessionCtxORT(OrtRunOptions *new_run_options, long *run_session_index); + +/** + * @brief Release the OrtRunOptions of a session that finished its run and + * reset the corresponding entry in the global structure. + * @param run_session_index - The entry index where OrtRunOptions was stored. + */ +void RAI_ResetRunSessionCtxORT(long run_session_index); diff --git a/src/backends/onnxruntime.c b/src/backends/onnxruntime.c index 7c6db175d..c6aef1310 100644 --- a/src/backends/onnxruntime.c +++ b/src/backends/onnxruntime.c @@ -2,12 +2,14 @@ #include #include "backends/util.h" #include -#include +#include +#include #include "util/arr.h" #include "backends/onnxruntime.h" #include "redis_ai_objects/tensor.h" #include "onnxruntime_c_api.h" +#include "backedns_api.h" // Use as a wrapper for ORT api call. If ORT api hasn't returned null, it has failed. // A label "error" must exist in every function that uses this macro. @@ -78,7 +80,8 @@ unsigned long long RAI_GetMemoryInfoORT() { return OnnxMemory; } unsigned long long RAI_GetMemoryAccessORT() { return OnnxMemoryAccessCounter; } -int RAI_InitBackendORT(int (*get_api_fn)(const char *, void *)) { +int RAI_InitBackendORT(int (*get_api_fn)(const char *, void **)) { + // Export redis callbacks. get_api_fn("RedisModule_Alloc", ((void **)&RedisModule_Alloc)); get_api_fn("RedisModule_Calloc", ((void **)&RedisModule_Calloc)); get_api_fn("RedisModule_Free", ((void **)&RedisModule_Free)); @@ -88,6 +91,16 @@ int RAI_InitBackendORT(int (*get_api_fn)(const char *, void *)) { get_api_fn("RedisModule_GetThreadSafeContext", ((void **)&RedisModule_GetThreadSafeContext)); get_api_fn("RedisModule_FreeThreadSafeContext", ((void **)&RedisModule_FreeThreadSafeContext)); get_api_fn("RedisModule_MallocSize", ((void **)&RedisModule_MallocSize)); + + // Export RedisAI callbacks. + get_api_fn("GetThreadId", ((void **)&RedisAI_GetThreadId)); + get_api_fn("GetNumThreadsPerQueue", ((void **)&RedisAI_GetNumThreadsPerQueue)); + get_api_fn("GetModelExecutionTimeout", ((void **)&RedisAI_GetModelExecutionTimeout)); + get_api_fn("GetThreadsCount", ((void **)&RedisAI_GetThreadsCount)); + + // Create a global array of onnx runSessions, with an entry for every working thread. + RAI_InitGlobalRunSessionsORT(); + return REDISMODULE_OK; } @@ -292,7 +305,7 @@ RAI_Tensor *RAI_TensorCreateFromOrtValue(OrtValue *v, size_t batch_offset, long size_t elem_count; ONNX_VALIDATE_STATUS(ort->GetTensorShapeElementCount(info, &elem_count)) - const size_t len = ceil((double)dtype.bits * elem_count / 8); + const size_t len = dtype.bits * elem_count / 8; const size_t total_bytesize = len * sizeof(char); const size_t sample_bytesize = total_bytesize / total_batch_size; const size_t batch_bytesize = sample_bytesize * batch_size; @@ -518,6 +531,8 @@ int RAI_ModelRunORT(RAI_Model *model, RAI_ExecutionCtx **ectxs, RAI_Error *error array_new_on_stack(const char *, 5, output_names); array_new_on_stack(OrtValue *, 5, inputs); array_new_on_stack(OrtValue *, 5, outputs); + OrtRunOptions *run_options = NULL; + long run_session_index; OrtTensorTypeAndShapeInfo *info = NULL; { size_t n_input_nodes; @@ -565,10 +580,23 @@ int RAI_ModelRunORT(RAI_Model *model, RAI_ExecutionCtx **ectxs, RAI_Error *error outputs = array_append(outputs, NULL); } - OrtRunOptions *run_options = NULL; + ONNX_VALIDATE_STATUS(ort->CreateRunOptions(&run_options)); + // Set the created run option in the global RunSessions and save its index. + RAI_ActivateRunSessionCtxORT(run_options, &run_session_index); + if (run_session_index == -1) { + RAI_SetError( + error, RAI_EMODELRUN, + "Cannot execute onnxruntime model synchronously, use async execution instead"); + ort->ReleaseRunOptions(run_options); + run_options = NULL; + goto error; + } + ONNX_VALIDATE_STATUS(ort->Run(session, run_options, input_names, (const OrtValue *const *)inputs, n_input_nodes, output_names, n_output_nodes, outputs)); + RAI_ResetRunSessionCtxORT(run_session_index); + run_options = NULL; for (uint32_t i = 0; i < ninputs; i++) { status = ort->AllocatorFree(global_allocator, (void *)input_names[i]); @@ -654,6 +682,9 @@ int RAI_ModelRunORT(RAI_Model *model, RAI_ExecutionCtx **ectxs, RAI_Error *error if (info) { ort->ReleaseTensorTypeAndShapeInfo(info); } + if (run_options) { + RAI_ResetRunSessionCtxORT(run_session_index); + } return REDISMODULE_ERR; } diff --git a/src/backends/onnxruntime.h b/src/backends/onnxruntime.h index a87777230..d165af32c 100644 --- a/src/backends/onnxruntime.h +++ b/src/backends/onnxruntime.h @@ -9,7 +9,7 @@ unsigned long long RAI_GetMemoryInfoORT(void); unsigned long long RAI_GetMemoryAccessORT(void); -int RAI_InitBackendORT(int (*get_api_fn)(const char *, void *)); +int RAI_InitBackendORT(int (*get_api_fn)(const char *, void **)); RAI_Model *RAI_ModelCreateORT(RAI_Backend backend, const char *devicestr, RAI_ModelOpts opts, const char *modeldef, size_t modellen, RAI_Error *err); diff --git a/src/config/config.c b/src/config/config.c index a8e43825a..e51d43692 100644 --- a/src/config/config.c +++ b/src/config/config.c @@ -1,101 +1,82 @@ #include "config.h" - -#include #include -#include -#include - #include "redismodule.h" -#include "rmutil/alloc.h" -#include "backends/util.h" #include "backends/backends.h" -#include "util/dict.h" -#include "util/queue.h" -#include "util/arr.h" -#include "execution/background_workers.h" -long long backends_intra_op_parallelism; // number of threads used within an - // individual op for parallelism. -long long backends_inter_op_parallelism; // number of threads used for parallelism - // between independent operations. -long long model_chunk_size; // size of chunks used to break up model payloads. +// Default configs +char *BackendsPath = NULL; // Path to backends dir. + +long long BackendsIntraOpParallelism = 0; // number of threads used within an + // individual op for parallelism. +long long BackendsInterOpParallelism = 0; // number of threads used for parallelism + // between independent operations. +long long ModelChunkSize = 535822336; // size of chunks used to break up model payloads. + // default is 511 * 1024 * 1024 +long long ThreadPoolSizePerQueue = 1; // Number of working threads for device. -/** - * - * @return number of threads used within an individual op for parallelism. - */ -long long getBackendsInterOpParallelism() { return backends_inter_op_parallelism; } +long long ModelExecutionTimeout = 5000; // The maximum time in milliseconds + // before killing onnx run session. -/** - * Set number of threads used for parallelism between independent operations, by - * backend. - * - * @param num_threads - * @return 0 on success, or 1 if failed - */ -int setBackendsInterOpParallelism(long long num_threads) { - int result = 1; - if (num_threads >= 0) { - backends_inter_op_parallelism = num_threads; - result = 0; +static int _Config_LoadTimeParamParse(RedisModuleCtx *ctx, const char *key, const char *val, + RedisModuleString *rsval) { + int ret = REDISMODULE_OK; + if (strcasecmp((key), "TF") == 0) { + ret = RAI_LoadBackend(ctx, RAI_BACKEND_TENSORFLOW, (val)); + } else if (strcasecmp((key), "TFLITE") == 0) { + ret = RAI_LoadBackend(ctx, RAI_BACKEND_TFLITE, (val)); + } else if (strcasecmp((key), "TORCH") == 0) { + ret = RAI_LoadBackend(ctx, RAI_BACKEND_TORCH, (val)); + } else if (strcasecmp((key), "ONNX") == 0) { + ret = RAI_LoadBackend(ctx, RAI_BACKEND_ONNXRUNTIME, (val)); + } + // enable configuring the main thread to create a fixed number of worker + // threads up front per device. by default we'll use 1 + else if (strcasecmp((key), "THREADS_PER_QUEUE") == 0) { + ret = Config_SetQueueThreadsNum(rsval); + if (ret == REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "%s: %s", REDISAI_INFOMSG_THREADS_PER_QUEUE, (val)); + } + } else if (strcasecmp((key), "INTRA_OP_PARALLELISM") == 0) { + ret = Config_SetIntraOperationParallelism(rsval); + if (ret == REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "%s: %s", REDISAI_INFOMSG_INTRA_OP_PARALLELISM, val); + } + } else if (strcasecmp((key), "INTER_OP_PARALLELISM") == 0) { + ret = Config_SetInterOperationParallelism(rsval); + if (ret == REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "%s: %s", REDISAI_INFOMSG_INTER_OP_PARALLELISM, val); + } + } else if (strcasecmp((key), "MODEL_CHUNK_SIZE") == 0) { + ret = Config_SetModelChunkSize(rsval); + if (ret == REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "%s: %s", REDISAI_INFOMSG_MODEL_CHUNK_SIZE, val); + } + } else if (strcasecmp((key), "MODEL_EXECUTION_TIMEOUT") == 0) { + ret = Config_SetModelExecutionTimeout(rsval); + if (ret == REDISMODULE_OK) { + RedisModule_Log(ctx, "notice", "%s: %s", REDISAI_INFOMSG_MODEL_EXECUTION_TIMEOUT, val); + } + } else if (strcasecmp((key), "BACKENDSPATH") == 0) { + // already taken care of + } else { + ret = REDISMODULE_ERR; } - return result; + return ret; } -/** - * - * @return - */ -long long getBackendsIntraOpParallelism() { return backends_intra_op_parallelism; } +long long Config_GetBackendsInterOpParallelism() { return BackendsInterOpParallelism; } -/** - * Set number of threads used within an individual op for parallelism, by - * backend. - * - * @param num_threads - * @return 0 on success, or 1 if failed - */ -int setBackendsIntraOpParallelism(long long num_threads) { - int result = 1; - if (num_threads >= 0) { - backends_intra_op_parallelism = num_threads; - result = 0; - } - return result; -} +long long Config_GetBackendsIntraOpParallelism() { return BackendsIntraOpParallelism; } -/** - * @return size of chunks (in bytes) in which models are split for - * set, get, serialization and replication. - */ -long long getModelChunkSize() { return model_chunk_size; } +long long Config_GetModelChunkSize() { return ModelChunkSize; } -/** - * Set size of chunks (in bytes) in which models are split for set, - * get, serialization and replication. - * - * @param size - * @return 0 on success, or 1 if failed - */ -int setModelChunkSize(long long size) { - int result = 1; - if (size > 0) { - model_chunk_size = size; - result = 0; - } - return result; -} +long long Config_GetNumThreadsPerQueue() { return ThreadPoolSizePerQueue; } + +long long Config_GetModelExecutionTimeout() { return ModelExecutionTimeout; } -/** - * Helper method for AI.CONFIG LOADBACKEND - * - * - * @param ctx Context in which Redis modules operate - * @param argv Redis command arguments, as an array of strings - * @param argc Redis command number of arguments - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if the DAGRUN failed - */ -int RedisAI_Config_LoadBackend(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +char *Config_GetBackendsPath() { return BackendsPath; } + +int Config_LoadBackend(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 3) return RedisModule_WrongArity(ctx); @@ -114,183 +95,92 @@ int RedisAI_Config_LoadBackend(RedisModuleCtx *ctx, RedisModuleString **argv, in } else { return RedisModule_ReplyWithError(ctx, "ERR unsupported backend"); } - if (result == REDISMODULE_OK) { return RedisModule_ReplyWithSimpleString(ctx, "OK"); } - return RedisModule_ReplyWithError(ctx, "ERR error loading backend"); } -/** - * Helper method for AI.CONFIG BACKENDSPATH - * - * - * @param ctx Context in which Redis modules operate - * @param path string containing backend path - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if the DAGRUN failed - */ -int RedisAI_Config_BackendsPath(RedisModuleCtx *ctx, const char *path) { - if (RAI_BackendsPath != NULL) { - RedisModule_Free(RAI_BackendsPath); +void Config_SetBackendsPath(const char *path) { + if (BackendsPath != NULL) { + RedisModule_Free(BackendsPath); } - RAI_BackendsPath = RedisModule_Strdup(path); - - return RedisModule_ReplyWithSimpleString(ctx, "OK"); + BackendsPath = RedisModule_Strdup(path); } -/** - * Set number of threads used for parallelism between RedisAI independent - * blocking commands ( AI.DAGRUN, AI.SCRIPTRUN, AI.MODELRUN ). - * - * @param num_threads_string string containing thread number - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed - */ -int RedisAI_Config_QueueThreads(RedisModuleString *num_threads_string) { - int result = RedisModule_StringToLongLong(num_threads_string, &perqueueThreadPoolSize); - // make sure the number of threads is a positive integer - // if not set the value to the default - if (result == REDISMODULE_OK && perqueueThreadPoolSize < 1) { - perqueueThreadPoolSize = REDISAI_DEFAULT_THREADS_PER_QUEUE; - result = REDISMODULE_ERR; +int Config_SetQueueThreadsNum(RedisModuleString *num_threads_string) { + long long val; + int result = RedisModule_StringToLongLong(num_threads_string, &val); + if (result != REDISMODULE_OK || val <= 0) { + return REDISMODULE_ERR; } - return result; + ThreadPoolSizePerQueue = val; + return REDISMODULE_OK; } -/** - * Set number of threads used for parallelism between independent operations, by - * backend. - * - * @param num_threads_string string containing thread number - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed - */ -int RedisAI_Config_InterOperationParallelism(RedisModuleString *num_threads_string) { - long long temp; - int result = RedisModule_StringToLongLong(num_threads_string, &temp); - if (result == REDISMODULE_OK) { - result = setBackendsInterOpParallelism(temp); +int Config_SetInterOperationParallelism(RedisModuleString *num_threads_string) { + long long val; + int result = RedisModule_StringToLongLong(num_threads_string, &val); + if (result != REDISMODULE_OK || val <= 0) { + return REDISMODULE_ERR; } - return result; + BackendsInterOpParallelism = val; + return REDISMODULE_OK; } -/** - * Set number of threads used within an individual op for parallelism, by - * backend. - * - * @param num_threads_string string containing thread number - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed - */ -int RedisAI_Config_IntraOperationParallelism(RedisModuleString *num_threads_string) { - long long temp; - int result = RedisModule_StringToLongLong(num_threads_string, &temp); - if (result == REDISMODULE_OK) { - result = setBackendsIntraOpParallelism(temp); +int Config_SetIntraOperationParallelism(RedisModuleString *num_threads_string) { + long long val; + int result = RedisModule_StringToLongLong(num_threads_string, &val); + if (result != REDISMODULE_OK || val <= 0) { + return REDISMODULE_ERR; } - return result; + BackendsIntraOpParallelism = val; + return REDISMODULE_OK; } -/** - * Set size of chunks in which model payloads are split for set, - * get, serialization and replication. - * - * @param chunk_size_string string containing chunk size (in bytes) - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed - */ -int RedisAI_Config_ModelChunkSize(RedisModuleString *chunk_size_string) { - long long temp; - int result = RedisModule_StringToLongLong(chunk_size_string, &temp); - // make sure chunk size is a positive integer - // if not set the value to the default - if (result == REDISMODULE_OK && temp < 1) { - temp = REDISAI_DEFAULT_MODEL_CHUNK_SIZE; - result = REDISMODULE_ERR; +int Config_SetModelChunkSize(RedisModuleString *chunk_size_string) { + long long val; + int result = RedisModule_StringToLongLong(chunk_size_string, &val); + if (result != REDISMODULE_OK || val <= 0) { + return REDISMODULE_ERR; } - result = setModelChunkSize(temp); - return result; + ModelChunkSize = val; + return REDISMODULE_OK; } -/** - * - * @param ctx Context in which Redis modules operate - * @param key - * @param val - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed - */ -int RAI_configParamParse(RedisModuleCtx *ctx, const char *key, const char *val, - RedisModuleString *rsval) { - int ret = REDISMODULE_OK; - if (strcasecmp((key), "TF") == 0) { - ret = RAI_LoadBackend(ctx, RAI_BACKEND_TENSORFLOW, (val)); - } else if (strcasecmp((key), "TFLITE") == 0) { - ret = RAI_LoadBackend(ctx, RAI_BACKEND_TFLITE, (val)); - } else if (strcasecmp((key), "TORCH") == 0) { - ret = RAI_LoadBackend(ctx, RAI_BACKEND_TORCH, (val)); - } else if (strcasecmp((key), "ONNX") == 0) { - ret = RAI_LoadBackend(ctx, RAI_BACKEND_ONNXRUNTIME, (val)); +int Config_SetModelExecutionTimeout(RedisModuleString *timeout) { + long long val; + int result = RedisModule_StringToLongLong(timeout, &val); + // Timeout should not be lower than the time passing between two consecutive + // runs of Redis cron callback, that is no more than (1/CONFIG_MIN_HZ) + if (result != REDISMODULE_OK || val < 1000) { + return REDISMODULE_ERR; } - // enable configuring the main thread to create a fixed number of worker - // threads up front per device. by default we'll use 1 - else if (strcasecmp((key), "THREADS_PER_QUEUE") == 0) { - ret = RedisAI_Config_QueueThreads(rsval); - if (ret == REDISMODULE_OK) { - RedisModule_Log(ctx, "notice", "%s: %s", REDISAI_INFOMSG_THREADS_PER_QUEUE, (val)); - } - } else if (strcasecmp((key), "INTRA_OP_PARALLELISM") == 0) { - ret = RedisAI_Config_IntraOperationParallelism(rsval); - if (ret == REDISMODULE_OK) { - RedisModule_Log(ctx, "notice", "%s: %lld", REDISAI_INFOMSG_INTRA_OP_PARALLELISM, - getBackendsIntraOpParallelism()); - } - } else if (strcasecmp((key), "INTER_OP_PARALLELISM") == 0) { - ret = RedisAI_Config_InterOperationParallelism(rsval); - if (ret == REDISMODULE_OK) { - RedisModule_Log(ctx, "notice", "%s: %lld", REDISAI_INFOMSG_INTER_OP_PARALLELISM, - getBackendsInterOpParallelism()); - } - } else if (strcasecmp((key), "MODEL_CHUNK_SIZE") == 0) { - ret = RedisAI_Config_ModelChunkSize(rsval); - if (ret == REDISMODULE_OK) { - RedisModule_Log(ctx, "notice", "%s: %lld", REDISAI_INFOMSG_MODEL_CHUNK_SIZE, - getModelChunkSize()); - } - } else if (strcasecmp((key), "BACKENDSPATH") == 0) { - // already taken care of - } else { - ret = REDISMODULE_ERR; - } - return ret; + ModelExecutionTimeout = val; + return REDISMODULE_OK; } -/** - * Load time configuration parser - * - * @param ctx Context in which Redis modules operate - * @param argv Redis command arguments, as an array of strings - * @param argc Redis command number of arguments - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if the DAGRUN failed - */ -int RAI_loadTimeConfig(RedisModuleCtx *ctx, RedisModuleString *const *argv, int argc) { +int Config_SetLoadTimeParams(RedisModuleCtx *ctx, RedisModuleString *const *argv, int argc) { if (argc > 0 && argc % 2 != 0) { RedisModule_Log(ctx, "warning", "Even number of arguments provided to module. Please " "provide arguments as KEY VAL pairs"); + return REDISMODULE_ERR; } // need BACKENDSPATH set up before loading specific backends for (int i = 0; i < argc / 2; i++) { const char *key = RedisModule_StringPtrLen(argv[2 * i], NULL); const char *val = RedisModule_StringPtrLen(argv[2 * i + 1], NULL); - - int ret = REDISMODULE_OK; if (strcasecmp(key, "BACKENDSPATH") == 0) { - ret = RedisAI_Config_BackendsPath(ctx, val); + Config_SetBackendsPath(val); } } for (int i = 0; i < argc / 2; i++) { const char *key = RedisModule_StringPtrLen(argv[2 * i], NULL); const char *val = RedisModule_StringPtrLen(argv[2 * i + 1], NULL); - int ret = RAI_configParamParse(ctx, key, val, argv[2 * i + 1]); + int ret = _Config_LoadTimeParamParse(ctx, key, val, argv[2 * i + 1]); if (ret == REDISMODULE_ERR) { char *buffer = RedisModule_Alloc( @@ -302,6 +192,5 @@ int RAI_loadTimeConfig(RedisModuleCtx *ctx, RedisModuleString *const *argv, int return ret; } } - return REDISMODULE_OK; } diff --git a/src/config/config.h b/src/config/config.h index d8fc461fe..724cc509a 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -16,71 +16,50 @@ typedef enum { typedef enum { RAI_DEVICE_CPU = 0, RAI_DEVICE_GPU = 1 } RAI_Device; -//#define RAI_COPY_RUN_INPUT #define RAI_COPY_RUN_OUTPUT #define RAI_PRINT_BACKEND_ERRORS -#define REDISAI_DEFAULT_THREADS_PER_QUEUE 1 -#define REDISAI_DEFAULT_INTRA_OP_PARALLELISM 0 -#define REDISAI_DEFAULT_INTER_OP_PARALLELISM 0 -#define REDISAI_DEFAULT_MODEL_CHUNK_SIZE 535822336 // (511 * 1024 * 1024) -#define REDISAI_ERRORMSG_PROCESSING_ARG "ERR error processing argument" -#define REDISAI_ERRORMSG_THREADS_PER_QUEUE "ERR error setting THREADS_PER_QUEUE to" -#define REDISAI_ERRORMSG_INTRA_OP_PARALLELISM "ERR error setting INTRA_OP_PARALLELISM to" -#define REDISAI_ERRORMSG_INTER_OP_PARALLELISM "ERR error setting INTER_OP_PARALLELISM to" - -#define REDISAI_INFOMSG_THREADS_PER_QUEUE "Setting THREADS_PER_QUEUE parameter to" -#define REDISAI_INFOMSG_INTRA_OP_PARALLELISM "Setting INTRA_OP_PARALLELISM parameter to" -#define REDISAI_INFOMSG_INTER_OP_PARALLELISM "Setting INTER_OP_PARALLELISM parameter to" -#define REDISAI_INFOMSG_MODEL_CHUNK_SIZE "Setting MODEL_CHUNK_SIZE parameter to" + +#define REDISAI_ERRORMSG_PROCESSING_ARG "ERR error processing argument" + +#define REDISAI_INFOMSG_THREADS_PER_QUEUE "Setting THREADS_PER_QUEUE parameter to" +#define REDISAI_INFOMSG_INTRA_OP_PARALLELISM "Setting INTRA_OP_PARALLELISM parameter to" +#define REDISAI_INFOMSG_INTER_OP_PARALLELISM "Setting INTER_OP_PARALLELISM parameter to" +#define REDISAI_INFOMSG_MODEL_CHUNK_SIZE "Setting MODEL_CHUNK_SIZE parameter to" +#define REDISAI_INFOMSG_MODEL_EXECUTION_TIMEOUT "Setting MODEL_EXECUTION_TIMEOUT parameter to" /** * Get number of threads used for parallelism between independent operations, by * backend. - * @return number of threads used for parallelism between independent - * operations, by backend */ -long long getBackendsInterOpParallelism(); +long long Config_GetBackendsInterOpParallelism(void); /** - * Set number of threads used for parallelism between independent operations, by + * Get number of threads used within an individual op for parallelism, by * backend. - * - * @param num_threads - * @return 0 on success, or 1 if failed */ -int setBackendsInterOpParallelism(long long num_threads); +long long Config_GetBackendsIntraOpParallelism(void); /** - * Get number of threads used within an individual op for parallelism, by - * backend. - * @return number of threads used within an individual op for parallelism, by - * backend. + * @return size of chunks (in bytes) in which models are split for + * set, get, serialization and replication. */ -long long getBackendsIntraOpParallelism(); +long long Config_GetModelChunkSize(void); /** - * Set number of threads used within an individual op for parallelism, by - * backend. - * - * @param num_threads - * @return 0 on success, or 1 if failed + * @brief Return the number of working threads per device in RedisAI. */ -int setBackendsIntraOpParallelism(long long num_threads); +long long Config_GetNumThreadsPerQueue(void); /** - * @return size of chunks (in bytes) in which models are split for - * set, get, serialization and replication. + * @return Number of milliseconds that a model session is allowed to run + * before killing it. Currently supported only for onnxruntime backend. */ -long long getModelChunkSize(); +long long Config_GetModelExecutionTimeout(void); /** - * Set size of chunks (in bytes) in which models are split for set, - * get, serialization and replication. - * - * @param size - * @return 0 on success, or 1 if failed + * @return Returns the backends path string. */ -int setModelChunkSize(long long size); +char *Config_GetBackendsPath(void); /** * Helper method for AI.CONFIG LOADBACKEND @@ -89,72 +68,61 @@ int setModelChunkSize(long long size); * @param ctx Context in which Redis modules operate * @param argv Redis command arguments, as an array of strings * @param argc Redis command number of arguments - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if the DAGRUN failed + * @return REDISMODULE_OK on success, or REDISMODULE_ERR otherwise. */ -int RedisAI_Config_LoadBackend(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); +int Config_LoadBackend(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); /** * Helper method for AI.CONFIG BACKENDSPATH * - * - * @param ctx Context in which Redis modules operate * @param path string containing backend path - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if the DAGRUN failed */ -int RedisAI_Config_BackendsPath(RedisModuleCtx *ctx, const char *path); +void Config_SetBackendsPath(const char *path); /** * Set number of threads used for parallelism between RedisAI independent - * blocking commands ( AI.DAGRUN, AI.SCRIPTRUN, AI.MODELRUN ). - * + * blocking commands (AI.DAGEXECUTE, AI.SCRIPTEXECUTE, AI.MODELEXECUTE). * @param num_threads_string string containing thread number - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed + * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed */ -int RedisAI_Config_QueueThreads(RedisModuleString *num_threads_string); +int Config_SetQueueThreadsNum(RedisModuleString *num_threads_string); /** * Set number of threads used for parallelism between independent operations, by * backend. - * * @param num_threads_string string containing thread number * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed */ -int RedisAI_Config_InterOperationParallelism(RedisModuleString *num_threads_string); +int Config_SetInterOperationParallelism(RedisModuleString *num_threads_string); /** * Set number of threads used within an individual op for parallelism, by * backend. - * * @param num_threads_string string containing thread number * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed */ -int RedisAI_Config_IntraOperationParallelism(RedisModuleString *num_threads_string); +int Config_SetIntraOperationParallelism(RedisModuleString *num_threads_string); /** * Set size of chunks in which model payloads are split for set, * get, serialization and replication. - * * @param chunk_size_string string containing chunk size (in bytes) - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed + * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed */ -int RedisAI_Config_ModelChunkSize(RedisModuleString *chunk_size_string); +int Config_SetModelChunkSize(RedisModuleString *chunk_size_string); /** - * - * @param ctx Context in which Redis modules operate - * @param key - * @param val + * Set the maximum time in ms that onnx backend allow running a model. + * @param onnx_max_runtime - string containing the max runtime (in ms) * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed */ -int RAI_configParamParse(RedisModuleCtx *ctx, const char *key, const char *val, - RedisModuleString *rsval); +int Config_SetModelExecutionTimeout(RedisModuleString *timeout); /** * Load time configuration parser - * * @param ctx Context in which Redis modules operate * @param argv Redis command arguments, as an array of strings * @param argc Redis command number of arguments - * @return REDISMODULE_OK on success, or REDISMODULE_ERR if the DAGRUN failed + * @return REDISMODULE_OK on success, or REDISMODULE_ERR if failed */ -int RAI_loadTimeConfig(RedisModuleCtx *ctx, RedisModuleString *const *argv, int argc); +int Config_SetLoadTimeParams(RedisModuleCtx *ctx, RedisModuleString *const *argv, int argc); diff --git a/src/execution/DAG/dag_execute.c b/src/execution/DAG/dag_execute.c index e676581a7..9a6dd2454 100644 --- a/src/execution/DAG/dag_execute.c +++ b/src/execution/DAG/dag_execute.c @@ -1,4 +1,5 @@ #include +#include #include "dag_execute.h" #include "util/string_utils.h" #include "execution/run_info.h" @@ -105,20 +106,8 @@ int DAG_InsertDAGToQueue(RedisAI_RunInfo *rinfo) { RunQueueInfo **run_queues_info = array_new(RunQueueInfo *, ndevices); for (long long i = 0; i < ndevices; i++) { - const char *devicestr = devices[i]; - RunQueueInfo *run_queue_info = NULL; - if (ensureRunQueue(devicestr, &run_queue_info) == REDISMODULE_ERR) { - // A device run queue was not created properly, so we free everything, - // set an error and finish. - array_free(devices); - for (int j = 0; j < ndevices; j++) { - RAI_DagRunInfoFreeShallowCopy(rinfo_copies[j]); - } - array_free(rinfo_copies); - array_free(run_queues_info); - RAI_SetError(rinfo->err, RAI_EDAGRUN, "ERR Queue not initialized for device"); - return REDISMODULE_ERR; - } + const char *device_str = devices[i]; + RunQueueInfo *run_queue_info = RunQueue_GetInfo(device_str); run_queues_info = array_append(run_queues_info, run_queue_info); } for (long long i = 0; i < ndevices; i++) { diff --git a/src/execution/background_workers.c b/src/execution/background_workers.c index 2ea8701c8..1c7c9d3e1 100644 --- a/src/execution/background_workers.c +++ b/src/execution/background_workers.c @@ -1,24 +1,7 @@ -/** - * background_workers.c - * - * Contains the structure to manage the per-device queues, used for decoupling - * the work from the main thread to the background worker threads. For each of - * the incoming ModelRun, ScriptRun, and DagRun commands, the request is queued - * and evaded asynchronously to one the device queues. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include "redisai.h" +#include "sys/time.h" #include "run_info.h" -#include "background_workers.h" +#include "run_queue_info.h" +#include "execution/DAG/dag.h" /* Define for RedisAI thread name setter */ #ifdef __linux__ @@ -38,75 +21,18 @@ int pthread_setname_np(const char *name); #endif #endif -int freeRunQueueInfo(RunQueueInfo *info) { - int result = REDISMODULE_OK; - if (info->run_queue) { - RedisModule_Free(info->run_queue); - } - RedisModule_Free(info->devicestr); - if (info->threads) { - /* Wait for workers to exit */ - for (int i = 0; i < perqueueThreadPoolSize; i++) { - const int rtn = pthread_join(info->threads[i], NULL); - if (rtn != 0) { - result = REDISMODULE_ERR; - } - } - /* Now free pool structure */ - RedisModule_Free(info->threads); - } - RedisModule_Free(info); - return result; -} - -void *RedisAI_Run_ThreadMain(void *arg); - -char *strToUpper(const char *input) { - char *output = RedisModule_Strdup(input); - size_t output_len = strlen(output); - for (long long i = 0; i < output_len; i++) { - output[i] = toupper(output[i]); - } - return output; -} - -/* Ensure that the the run queue for the device exists. - * If not, create it. */ -int ensureRunQueue(const char *devicestr, RunQueueInfo **run_queue_info) { - int result = REDISMODULE_ERR; - if (run_queues == NULL) { - return result; - } - - char *devicestr_ = strToUpper(devicestr); - - AI_dictEntry *entry = AI_dictFind(run_queues, devicestr_); - if (entry) { - *run_queue_info = AI_dictGetVal(entry); - result = REDISMODULE_OK; - } else { - *run_queue_info = RedisModule_Alloc(sizeof(RunQueueInfo)); - (*run_queue_info)->run_queue = queueCreate(); - (*run_queue_info)->devicestr = RedisModule_Strdup(devicestr_); - pthread_cond_init(&(*run_queue_info)->queue_condition_var, NULL); - pthread_mutex_init(&(*run_queue_info)->run_queue_mutex, NULL); - (*run_queue_info)->threads = - (pthread_t *)RedisModule_Alloc(sizeof(pthread_t) * perqueueThreadPoolSize); - /* create threads */ - for (int i = 0; i < perqueueThreadPoolSize; i++) { - if (pthread_create(&((*run_queue_info)->threads[i]), NULL, RedisAI_Run_ThreadMain, - *run_queue_info) != 0) { - freeRunQueueInfo(*run_queue_info); - return REDISMODULE_ERR; - } - } - AI_dictAdd(run_queues, (void *)devicestr_, (void *)*run_queue_info); - result = REDISMODULE_OK; - } +uintptr_t BGWorkersCounter; // Total number of BG threads running currently. +pthread_key_t ThreadIdKey; // Key to hold thread id in its local storage. - RedisModule_Free(devicestr_); - - return result; +/** + * @brief Save the id for some working thread in thread local storage. + */ +static void _BGWorker_SaveThreadId() { + // Let the current thread have the next available id, and increase the counter. + long id_value = __atomic_add_fetch(&BGWorkersCounter, 1, __ATOMIC_RELAXED); + // Convert the id value to a pointer and store it the thread local storage. + // First id is 1, so we won't confuse with NULL (which is the error return value) + pthread_setspecific(ThreadIdKey, (const void *)id_value); } /** @@ -178,9 +104,9 @@ static void _BGThread_Execute(RunQueueInfo *run_queue_info, RedisAI_RunInfo **ba // For simplicity, we call into different functions whether the run // is batched or not if (batched_run) { - RedisAI_BatchedDagRunSessionStep(batch_rinfo, run_queue_info->devicestr); + RedisAI_BatchedDagRunSessionStep(batch_rinfo, run_queue_info->device_str); } else { - RedisAI_DagRunSessionStep(batch_rinfo[0], run_queue_info->devicestr); + RedisAI_DagRunSessionStep(batch_rinfo[0], run_queue_info->device_str); } } } @@ -327,9 +253,19 @@ static bool _BGThread_PrepareExecution(RunQueueInfo *run_queue_info, RedisAI_Run return true; } -void *RedisAI_Run_ThreadMain(void *arg) { +long BGWorker_GetThreadId() { + void *thread_id = pthread_getspecific(ThreadIdKey); + + // Return the 0 based id, if thread_id was NULL, we return -1 to indicates that + // the caller is not RedisAI thread. + return (long)(thread_id)-1; +} + +uintptr_t BGWorker_GetThreadsCount() { return BGWorkersCounter; } + +void *BGWorker_ThreadMain(void *arg) { + _BGWorker_SaveThreadId(); RunQueueInfo *run_queue_info = (RunQueueInfo *)arg; - RAI_PTHREAD_SETNAME("redisai_bthread"); RedisAI_RunInfo **batch_rinfo = array_new(RedisAI_RunInfo *, 1); pthread_mutex_lock(&run_queue_info->run_queue_mutex); diff --git a/src/execution/background_workers.h b/src/execution/background_workers.h index 43d7099b0..b6e8eacb6 100644 --- a/src/execution/background_workers.h +++ b/src/execution/background_workers.h @@ -27,22 +27,22 @@ #include "redis_ai_objects/stats.h" #include "redis_ai_objects/tensor.h" #include "util/arr.h" -#include "util/dict.h" #include "util/queue.h" -AI_dict *run_queues; -long long perqueueThreadPoolSize; - -typedef struct RunQueueInfo { - pthread_mutex_t run_queue_mutex; - pthread_cond_t queue_condition_var; - queue *run_queue; - pthread_t *threads; - char *devicestr; -} RunQueueInfo; +/** + * @brief RedisAI main loop for every background working thread + * @param arg - This is the run queue info of the device on which this thread is + * running the AI model/script + */ +void *BGWorker_ThreadMain(void *arg); -int freeRunQueueInfo(RunQueueInfo *info); +/** + * @brief Returns the thread id (among RedisAI working threads). If this is called + * form a non RedisAI working thread, return -1 + */ +long BGWorker_GetThreadId(void); -/* Ensure that the the run queue for the device exists. - * If not, create it. */ -int ensureRunQueue(const char *devicestr, RunQueueInfo **run_queue_info); +/** + * @brief Returns the total number of RedisAI working threads (for all devices). + */ +uintptr_t BGWorker_GetThreadsCount(void); \ No newline at end of file diff --git a/src/execution/parsing/deprecated.c b/src/execution/parsing/deprecated.c index 5dfe78b21..3d95bc42c 100644 --- a/src/execution/parsing/deprecated.c +++ b/src/execution/parsing/deprecated.c @@ -1,4 +1,3 @@ - #include "deprecated.h" #include "rmutil/args.h" #include "backends/backends.h" @@ -6,11 +5,12 @@ #include "redis_ai_objects/stats.h" #include "execution/utils.h" +#include #include "execution/DAG/dag_builder.h" #include "execution/DAG/dag_execute.h" -#include "execution/background_workers.h" #include "execution/parsing/dag_parser.h" #include "execution/parsing/parse_utils.h" + #include "execution/execution_contexts/modelRun_ctx.h" #include "execution/execution_contexts/scriptRun_ctx.h" @@ -236,8 +236,8 @@ int ModelSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { .batchsize = batchsize, .minbatchsize = minbatchsize, .minbatchtimeout = minbatchtimeout, - .backends_intra_op_parallelism = getBackendsIntraOpParallelism(), - .backends_inter_op_parallelism = getBackendsInterOpParallelism(), + .backends_intra_op_parallelism = Config_GetBackendsIntraOpParallelism(), + .backends_inter_op_parallelism = Config_GetBackendsInterOpParallelism(), }; RAI_Model *model = NULL; @@ -305,17 +305,12 @@ int ModelSetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } // TODO: if backend loaded, make sure there's a queue - RunQueueInfo *run_queue_info = NULL; - if (ensureRunQueue(devicestr, &run_queue_info) != REDISMODULE_OK) { - RAI_ModelFree(model, &err); - if (err.code != RAI_OK) { - RedisModule_Log(ctx, "warning", "%s", err.detail); - int ret = RedisModule_ReplyWithError(ctx, err.detail_oneline); - RAI_ClearError(&err); - return ret; + if (!RunQueue_IsExists(devicestr)) { + RunQueueInfo *run_queue_info = RunQueue_Create(devicestr); + if (run_queue_info == NULL) { + RAI_ModelFree(model, &err); + RedisModule_ReplyWithError(ctx, "ERR Could not initialize queue on requested device"); } - return RedisModule_ReplyWithError(ctx, - "ERR Could not initialize queue on requested device"); } RedisModuleKey *key = RedisModule_OpenKey(ctx, keystr, REDISMODULE_READ | REDISMODULE_WRITE); diff --git a/src/execution/run_queue_info.c b/src/execution/run_queue_info.c new file mode 100644 index 000000000..7ef559da6 --- /dev/null +++ b/src/execution/run_queue_info.c @@ -0,0 +1,72 @@ +#include "string_utils.h" +#include "run_queue_info.h" +#include "backends/backends.h" +#include "background_workers.h" + +RunQueueInfo *RunQueue_Create(const char *device_str) { + + size_t device_str_len = strlen(device_str); + char upper_device_str[device_str_len + 1]; + RAI_StringToUpper(device_str, upper_device_str, device_str_len + 1); + + // Create new run queue and initialize its inner fields. + RunQueueInfo *run_queue_info = RedisModule_Alloc(sizeof(RunQueueInfo)); + run_queue_info->run_queue = queueCreate(); + run_queue_info->device_str = RedisModule_Strdup(upper_device_str); + pthread_cond_init(&(run_queue_info->queue_condition_var), NULL); + pthread_mutex_init(&(run_queue_info->run_queue_mutex), NULL); + run_queue_info->threads = array_new(pthread_t, Config_GetNumThreadsPerQueue()); + // Save device with its associate run queue info in the dictionary. + if (AI_dictAdd(RunQueues, upper_device_str, run_queue_info) != DICT_OK) { + RunQueue_Free(run_queue_info); + return NULL; + } + + // Create worker threads. + for (int i = 0; i < Config_GetNumThreadsPerQueue(); i++) { + pthread_t thread; + if (pthread_create(&thread, NULL, BGWorker_ThreadMain, run_queue_info) != 0) { + AI_dictDelete(RunQueues, upper_device_str); + RunQueue_Free(run_queue_info); + return NULL; + } + run_queue_info->threads = array_append(run_queue_info->threads, thread); + } + + // Add the new device worker threads to onnx run sessions tracking. + if (RAI_backends.onnx.add_new_device_cb) { + RAI_backends.onnx.add_new_device_cb(device_str); + } + return run_queue_info; +} + +RunQueueInfo *RunQueue_GetInfo(const char *device_str) { + size_t device_str_len = strlen(device_str); + char upper_device_str[device_str_len + 1]; + RAI_StringToUpper(device_str, upper_device_str, device_str_len + 1); + AI_dictEntry *entry = AI_dictFind(RunQueues, upper_device_str); + RedisModule_Assert(entry != NULL); + return AI_dictGetVal(entry); +} + +bool RunQueue_IsExists(const char *device_str) { + size_t device_str_len = strlen(device_str); + char upper_device_str[device_str_len + 1]; + RAI_StringToUpper(device_str, upper_device_str, device_str_len + 1); + return AI_dictFind(RunQueues, upper_device_str) != NULL; +} + +void RunQueue_Free(RunQueueInfo *run_queue_info) { + RedisModule_Assert(queueLength(run_queue_info->run_queue) == 0); + RedisModule_Free(run_queue_info->run_queue); + RedisModule_Free(run_queue_info->device_str); + + // Wait for workers to exit and free the pool. + for (int i = 0; i < array_len(run_queue_info->threads); i++) { + RedisModule_Assert(pthread_join(run_queue_info->threads[i], NULL) == 0); + RedisModule_Free(run_queue_info->threads); + } + pthread_mutex_destroy(&(run_queue_info->run_queue_mutex)); + pthread_cond_destroy(&(run_queue_info->queue_condition_var)); + RedisModule_Free(run_queue_info); +} diff --git a/src/execution/run_queue_info.h b/src/execution/run_queue_info.h new file mode 100644 index 000000000..da898a267 --- /dev/null +++ b/src/execution/run_queue_info.h @@ -0,0 +1,43 @@ +#pragma once + +/** + * Contains the structure to manage the per-device queues, used for decoupling + * the work from the main thread to the background worker threads. For each of + * the incoming ModelRun, ScriptRun, and DagRun commands, the request is queued + * and evaded asynchronously to one the device queues. + */ + +#include "utils.h" +#include "queue.h" +#include "dictionaries.h" + +AI_dict *RunQueues; + +typedef struct RunQueueInfo { + pthread_mutex_t run_queue_mutex; + pthread_cond_t queue_condition_var; + queue *run_queue; + pthread_t *threads; + char *device_str; +} RunQueueInfo; + +/** + * @brief Create a new run queue for a device. + */ +RunQueueInfo *RunQueue_Create(const char *device_str); + +/** + * @brief Return true if a ru queue exists for this particular device. + */ +bool RunQueue_IsExists(const char *device_str); + +/** + * @brief Return the RunQueueInfo saved in the global RunQueues dict for a certain + * device name, after asserting that it exists. + */ +RunQueueInfo *RunQueue_GetInfo(const char *device_str); + +/** + * @brief Terminate all working threads and free the run queue with its inner fields. + */ +void RunQueue_Free(RunQueueInfo *info); diff --git a/src/execution/utils.c b/src/execution/utils.c index 723ca661f..07b0704eb 100644 --- a/src/execution/utils.c +++ b/src/execution/utils.c @@ -1,5 +1,4 @@ #include "utils.h" -#include "redis_ai_objects/tensor.h" #include "redis_ai_objects/model.h" int redisMajorVersion; diff --git a/src/execution/utils.h b/src/execution/utils.h index f4253a54c..afeeb1663 100644 --- a/src/execution/utils.h +++ b/src/execution/utils.h @@ -1,9 +1,7 @@ #pragma once -#include "redismodule.h" -#include "redis_ai_objects/tensor_struct.h" -#include "redis_ai_objects/model_struct.h" -#include "redis_ai_objects/err.h" + #include +#include "redismodule.h" /** Use this to check if a command is given a key whose hash slot is not on the current * shard, when using enterprise cluster. diff --git a/src/redis_ai_objects/tensor.c b/src/redis_ai_objects/tensor.c index da95b6857..5c26b6242 100644 --- a/src/redis_ai_objects/tensor.c +++ b/src/redis_ai_objects/tensor.c @@ -14,7 +14,6 @@ #include "tensor.h" #include "err.h" #include "arr.h" -#include "math.h" #include "redisai.h" #include "version.h" #include "tensor_struct.h" diff --git a/src/redisai.c b/src/redisai.c index a7eb73f50..589566bbd 100644 --- a/src/redisai.c +++ b/src/redisai.c @@ -8,16 +8,14 @@ #include "redis_ai_objects/tensor.h" #include "execution/command_parser.h" #include "backends/backends.h" -#include "backends/util.h" -#include "execution/utils.h" -#include "execution/background_workers.h" #include "execution/DAG/dag.h" #include "execution/DAG/dag_builder.h" #include "execution/DAG/dag_execute.h" +#include "execution/utils.h" #include "execution/parsing/deprecated.h" -#include "redis_ai_objects/model.h" #include "execution/execution_contexts/modelRun_ctx.h" #include "execution/execution_contexts/scriptRun_ctx.h" +#include "redis_ai_objects/model.h" #include "redis_ai_objects/script.h" #include "redis_ai_objects/stats.h" #include @@ -26,6 +24,7 @@ #include #include #include +#include #include "rmutil/alloc.h" #include "rmutil/args.h" @@ -62,6 +61,8 @@ extern int rlecMinorVersion; extern int rlecPatchVersion; extern int rlecBuild; +extern pthread_key_t ThreadIdKey; + /* ----------------------- RedisAI Module Commands ------------------------- */ /** @@ -233,8 +234,8 @@ int RedisAI_ModelStore_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg .batchsize = batchsize, .minbatchsize = minbatchsize, .minbatchtimeout = minbatchtimeout, - .backends_intra_op_parallelism = getBackendsIntraOpParallelism(), - .backends_inter_op_parallelism = getBackendsInterOpParallelism(), + .backends_intra_op_parallelism = Config_GetBackendsIntraOpParallelism(), + .backends_inter_op_parallelism = Config_GetBackendsInterOpParallelism(), }; if (AC_IsAtEnd(&ac)) { @@ -352,17 +353,12 @@ int RedisAI_ModelStore_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg } // TODO: if backend loaded, make sure there's a queue - RunQueueInfo *run_queue_info = NULL; - if (ensureRunQueue(devicestr, &run_queue_info) != REDISMODULE_OK) { - RAI_ModelFree(model, &err); - if (err.code != RAI_OK) { - RedisModule_Log(ctx, "warning", "%s", err.detail); - int ret = RedisModule_ReplyWithError(ctx, err.detail_oneline); - RAI_ClearError(&err); - return ret; + if (!RunQueue_IsExists(devicestr)) { + RunQueueInfo *run_queue_info = RunQueue_Create(devicestr); + if (run_queue_info == NULL) { + RAI_ModelFree(model, &err); + RedisModule_ReplyWithError(ctx, "ERR Could not initialize queue on requested device"); } - return RedisModule_ReplyWithError(ctx, - "ERR Could not initialize queue on requested device"); } RedisModuleKey *key = RedisModule_OpenKey(ctx, keystr, REDISMODULE_READ | REDISMODULE_WRITE); @@ -395,7 +391,7 @@ int RedisAI_ModelStore_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg } void RAI_ReplyWithChunks(RedisModuleCtx *ctx, const char *buffer, long long len) { - long long chunk_size = getModelChunkSize(); + long long chunk_size = Config_GetModelChunkSize(); const size_t n_chunks = len / chunk_size + 1; if (n_chunks > 1) { RedisModule_ReplyWithArray(ctx, (long)n_chunks); @@ -467,7 +463,7 @@ int RedisAI_ModelGet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, RedisModule_ReplyWithArray(ctx, outentries); RedisModule_ReplyWithCString(ctx, "backend"); - const char *backendstr = RAI_BackendName(mto->backend); + const char *backendstr = RAI_GetBackendName(mto->backend); RedisModule_ReplyWithCString(ctx, backendstr); RedisModule_ReplyWithCString(ctx, "device"); @@ -780,20 +776,12 @@ int RedisAI_ScriptSet_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv return ret; } - RunQueueInfo *run_queue_info = NULL; - // If the queue does not exist, initialize it - if (ensureRunQueue(devicestr, &run_queue_info) == REDISMODULE_ERR) { - RAI_ScriptFree(script, &err); - if (err.code != RAI_OK) { -#ifdef RAI_PRINT_BACKEND_ERRORS - printf("ERR: %s\n", err.detail); -#endif - int ret = RedisModule_ReplyWithError(ctx, err.detail_oneline); - RAI_ClearError(&err); - return ret; + if (!RunQueue_IsExists(devicestr)) { + RunQueueInfo *run_queue_info = RunQueue_Create(devicestr); + if (run_queue_info == NULL) { + RAI_ScriptFree(script, &err); + RedisModule_ReplyWithError(ctx, "ERR Could not initialize queue on requested device"); } - return RedisModule_ReplyWithError(ctx, - "ERR Could not initialize queue on requested device"); } RedisModuleKey *key = RedisModule_OpenKey(ctx, keystr, REDISMODULE_READ | REDISMODULE_WRITE); @@ -856,81 +844,13 @@ int RedisAI_ScriptScan_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **arg return REDISMODULE_OK; } -void _RedisAI_Info(RedisModuleCtx *ctx) { - RedisModuleString *rai_version = RedisModule_CreateStringPrintf( - ctx, "%d.%d.%d", REDISAI_VERSION_MAJOR, REDISAI_VERSION_MINOR, REDISAI_VERSION_PATCH); - RedisModuleString *llapi_version = - RedisModule_CreateStringPrintf(ctx, "%d", REDISAI_LLAPI_VERSION); - RedisModuleString *rdb_version = RedisModule_CreateStringPrintf(ctx, "%llu", REDISAI_ENC_VER); - - int reponse_len = 6; - - if (RAI_backends.tf.get_version) { - reponse_len += 2; - } - - if (RAI_backends.torch.get_version) { - reponse_len += 2; - } - - if (RAI_backends.tflite.get_version) { - reponse_len += 2; - } - - if (RAI_backends.onnx.get_version) { - reponse_len += 2; - } - - RedisModule_ReplyWithArray(ctx, reponse_len); - - RedisModule_ReplyWithSimpleString(ctx, "Version"); - RedisModule_ReplyWithString(ctx, rai_version); - - // TODO: Add Git SHA - - RedisModule_ReplyWithSimpleString(ctx, "Low Level API Version"); - RedisModule_ReplyWithString(ctx, llapi_version); - - RedisModule_ReplyWithSimpleString(ctx, "RDB Encoding version"); - RedisModule_ReplyWithString(ctx, llapi_version); - - if (RAI_backends.tf.get_version) { - RedisModule_ReplyWithSimpleString(ctx, "TensorFlow version"); - RedisModule_ReplyWithSimpleString(ctx, RAI_backends.tf.get_version()); - } - - if (RAI_backends.torch.get_version) { - RedisModule_ReplyWithSimpleString(ctx, "Torch version"); - RedisModule_ReplyWithSimpleString(ctx, RAI_backends.torch.get_version()); - } - - if (RAI_backends.tflite.get_version) { - RedisModule_ReplyWithSimpleString(ctx, "TFLite version"); - RedisModule_ReplyWithSimpleString(ctx, RAI_backends.tflite.get_version()); - } - - if (RAI_backends.onnx.get_version) { - RedisModule_ReplyWithSimpleString(ctx, "ONNX version"); - RedisModule_ReplyWithSimpleString(ctx, RAI_backends.onnx.get_version()); - } - - RedisModule_FreeString(ctx, rai_version); - RedisModule_FreeString(ctx, llapi_version); - RedisModule_FreeString(ctx, rdb_version); -} - /** * AI.INFO [RESETSTAT] */ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc > 3) + if (argc < 2 || argc > 3) return RedisModule_WrongArity(ctx); - if (argc == 1) { - _RedisAI_Info(ctx); - return REDISMODULE_OK; - } - RedisModuleString *runkey = argv[1]; struct RedisAI_RunStats *rstats = NULL; if (RAI_GetRunStats(runkey, &rstats) == REDISMODULE_ERR) { @@ -957,7 +877,7 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RedisModule_ReplyWithCString(ctx, "SCRIPT"); } RedisModule_ReplyWithCString(ctx, "backend"); - RedisModule_ReplyWithCString(ctx, RAI_BackendName(rstats->backend)); + RedisModule_ReplyWithCString(ctx, RAI_GetBackendName(rstats->backend)); RedisModule_ReplyWithCString(ctx, "device"); RedisModule_ReplyWithCString(ctx, rstats->devicestr); RedisModule_ReplyWithCString(ctx, "tag"); @@ -969,7 +889,7 @@ int RedisAI_Info_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int RedisModule_ReplyWithCString(ctx, "duration"); RedisModule_ReplyWithLongLong(ctx, rstats->duration_us); RedisModule_ReplyWithCString(ctx, "samples"); - if (rstats->type == 0) { + if (rstats->type == RAI_MODEL) { RedisModule_ReplyWithLongLong(ctx, rstats->samples); } else { RedisModule_ReplyWithLongLong(ctx, -1); @@ -993,12 +913,13 @@ int RedisAI_Config_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i const char *subcommand = RedisModule_StringPtrLen(argv[1], NULL); if (!strcasecmp(subcommand, "LOADBACKEND")) { - return RedisAI_Config_LoadBackend(ctx, argv + 1, argc - 1); + return Config_LoadBackend(ctx, argv + 1, argc - 1); } if (!strcasecmp(subcommand, "BACKENDSPATH")) { if (argc > 2) { - return RedisAI_Config_BackendsPath(ctx, RedisModule_StringPtrLen(argv[2], NULL)); + Config_SetBackendsPath(RedisModule_StringPtrLen(argv[2], NULL)); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); } else { return RedisModule_ReplyWithError(ctx, "ERR BACKENDSPATH: missing path argument"); } @@ -1006,8 +927,11 @@ int RedisAI_Config_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i if (!strcasecmp(subcommand, "MODEL_CHUNK_SIZE")) { if (argc > 2) { - RedisAI_Config_ModelChunkSize(argv[2]); - return RedisModule_ReplyWithSimpleString(ctx, "OK"); + if (Config_SetModelChunkSize(argv[2]) == REDISMODULE_OK) { + return RedisModule_ReplyWithSimpleString(ctx, "OK"); + } else { + return RedisModule_ReplyWithError(ctx, "ERR MODEL_CHUNK_SIZE: invalid chunk size"); + } } else { return RedisModule_ReplyWithError(ctx, "ERR MODEL_CHUNK_SIZE: missing chunk size"); } @@ -1216,23 +1140,59 @@ static int RedisAI_RegisterApi(RedisModuleCtx *ctx) { return REDISMODULE_OK; } -void RAI_moduleInfoFunc(RedisModuleInfoCtx *ctx, int for_crash_report) { - RedisModule_InfoAddSection(ctx, "git"); - RedisModule_InfoAddFieldCString(ctx, "git_sha", REDISAI_GIT_SHA); - RedisModule_InfoAddSection(ctx, "load_time_configs"); - RedisModule_InfoAddFieldLongLong(ctx, "threads_per_queue", perqueueThreadPoolSize); - RedisModule_InfoAddFieldLongLong(ctx, "inter_op_parallelism", getBackendsInterOpParallelism()); - RedisModule_InfoAddFieldLongLong(ctx, "intra_op_parallelism", getBackendsIntraOpParallelism()); - RedisModule_InfoAddSection(ctx, "memory_usage"); - if (RAI_backends.onnx.get_memory_info) { +static void _moduleInfo_getBackendsInfo(RedisModuleInfoCtx *ctx) { + RedisModule_InfoAddSection(ctx, "backends_info"); + if (RAI_backends.tf.get_version) { + RedisModule_InfoAddFieldCString(ctx, "TensorFlow_version", + (char *)RAI_backends.tf.get_version()); + } + if (RAI_backends.tflite.get_version) { + RedisModule_InfoAddFieldCString(ctx, "TensorFlowLite_version", + (char *)RAI_backends.tflite.get_version()); + } + if (RAI_backends.torch.get_version) { + RedisModule_InfoAddFieldCString(ctx, "Torch_version", + (char *)RAI_backends.torch.get_version()); + } + if (RAI_backends.onnx.get_version) { + RedisModule_InfoAddFieldCString(ctx, "onnxruntime_version", + (char *)RAI_backends.onnx.get_version()); RedisModule_InfoAddFieldULongLong(ctx, "onnxruntime_memory", RAI_backends.onnx.get_memory_info()); RedisModule_InfoAddFieldULongLong(ctx, "onnxruntime_memory_access_num", RAI_backends.onnx.get_memory_access_num()); - } else { - RedisModule_InfoAddFieldULongLong(ctx, "onnxruntime_memory", 0); - RedisModule_InfoAddFieldULongLong(ctx, "onnxruntime_memory_access_num", 0); + RedisModule_InfoAddFieldULongLong(ctx, "onnxruntime_maximum_run_sessions_number", + RAI_backends.onnx.get_max_run_sessions()); } +} + +void RAI_moduleInfoFunc(RedisModuleInfoCtx *ctx, int for_crash_report) { + RedisModule_InfoAddSection(ctx, "versions"); + RedisModuleString *rai_version = RedisModule_CreateStringPrintf( + NULL, "%d.%d.%d", REDISAI_VERSION_MAJOR, REDISAI_VERSION_MINOR, REDISAI_VERSION_PATCH); + RedisModuleString *llapi_version = + RedisModule_CreateStringPrintf(NULL, "%d", REDISAI_LLAPI_VERSION); + RedisModuleString *rdb_version = RedisModule_CreateStringPrintf(NULL, "%llu", REDISAI_ENC_VER); + + RedisModule_InfoAddFieldString(ctx, "RedisAI_version", rai_version); + RedisModule_InfoAddFieldString(ctx, "low_level_API_version", llapi_version); + RedisModule_InfoAddFieldString(ctx, "rdb_version", rdb_version); + + RedisModule_FreeString(NULL, rai_version); + RedisModule_FreeString(NULL, llapi_version); + RedisModule_FreeString(NULL, rdb_version); + + RedisModule_InfoAddSection(ctx, "git"); + RedisModule_InfoAddFieldCString(ctx, "git_sha", REDISAI_GIT_SHA); + RedisModule_InfoAddSection(ctx, "load_time_configs"); + RedisModule_InfoAddFieldLongLong(ctx, "threads_per_queue", Config_GetNumThreadsPerQueue()); + RedisModule_InfoAddFieldLongLong(ctx, "inter_op_parallelism", + Config_GetBackendsInterOpParallelism()); + RedisModule_InfoAddFieldLongLong(ctx, "intra_op_parallelism", + Config_GetBackendsIntraOpParallelism()); + RedisModule_InfoAddFieldLongLong(ctx, "model_execution_timeout", + Config_GetModelExecutionTimeout()); + _moduleInfo_getBackendsInfo(ctx); struct rusage self_ru, c_ru; // Return resource usage statistics for the calling process, @@ -1283,13 +1243,13 @@ void RAI_moduleInfoFunc(RedisModuleInfoCtx *ctx, int for_crash_report) { RedisModule_FreeString(NULL, main_thread_used_cpu_sys); RedisModule_FreeString(NULL, main_thread_used_cpu_user); - AI_dictIterator *iter = AI_dictGetSafeIterator(run_queues); + AI_dictIterator *iter = AI_dictGetSafeIterator(RunQueues); AI_dictEntry *entry = AI_dictNext(iter); while (entry) { char *queue_name = (char *)AI_dictGetKey(entry); RunQueueInfo *run_queue_info = (RunQueueInfo *)AI_dictGetVal(entry); if (run_queue_info) { - for (int i = 0; i < perqueueThreadPoolSize; i++) { + for (int i = 0; i < Config_GetNumThreadsPerQueue(); i++) { pthread_t current_bg_threads = run_queue_info->threads[i]; struct timespec ts; clockid_t cid; @@ -1472,22 +1432,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS); - // Default configs - RAI_BackendsPath = NULL; - perqueueThreadPoolSize = REDISAI_DEFAULT_THREADS_PER_QUEUE; - setBackendsInterOpParallelism(REDISAI_DEFAULT_INTER_OP_PARALLELISM); - setBackendsIntraOpParallelism(REDISAI_DEFAULT_INTRA_OP_PARALLELISM); - setModelChunkSize(REDISAI_DEFAULT_MODEL_CHUNK_SIZE); - - RAI_loadTimeConfig(ctx, argv, argc); - - run_queues = AI_dictCreate(&AI_dictTypeHeapStrings, NULL); - RunQueueInfo *run_queue_info = NULL; - if (ensureRunQueue("CPU", &run_queue_info) != REDISMODULE_OK) { - RedisModule_Log(ctx, "warning", "Queue not initialized for device CPU"); + if (Config_SetLoadTimeParams(ctx, argv, argc) != REDISMODULE_OK) { return REDISMODULE_ERR; } + RunQueues = AI_dictCreate(&AI_dictTypeHeapStrings, NULL); + pthread_key_create(&ThreadIdKey, NULL); + RunQueueInfo *cpu_run_queue_info = RunQueue_Create("CPU"); + if (cpu_run_queue_info == NULL) { + RedisModule_Log(ctx, "warning", "RedisAI could not initialize run queue for CPU"); + return REDISMODULE_ERR; + } run_stats = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL); return REDISMODULE_OK; diff --git a/src/serialization/AOF/rai_aof_rewrite.c b/src/serialization/AOF/rai_aof_rewrite.c index 0120ef052..29d8dd6e3 100644 --- a/src/serialization/AOF/rai_aof_rewrite.c +++ b/src/serialization/AOF/rai_aof_rewrite.c @@ -46,7 +46,7 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value // [INPUTS name1 name2 ... OUTPUTS name1 name2 ...] // BLOB model_blob - long long chunk_size = getModelChunkSize(); + long long chunk_size = Config_GetModelChunkSize(); const size_t n_chunks = len / chunk_size + 1; RedisModuleString **buffers_ = array_new(RedisModuleString *, n_chunks); @@ -60,7 +60,7 @@ void RAI_AOFRewriteModel(RedisModuleIO *aof, RedisModuleString *key, void *value RedisModule_Free(buffer); } - const char *backendstr = RAI_BackendName(model->backend); + const char *backendstr = RAI_GetBackendName(model->backend); if (model->backend != RAI_BACKEND_TENSORFLOW) { diff --git a/src/serialization/RDB/decoder/current/v2/decode_v2.c b/src/serialization/RDB/decoder/current/v2/decode_v2.c index 67aa043d9..714bcb9bf 100644 --- a/src/serialization/RDB/decoder/current/v2/decode_v2.c +++ b/src/serialization/RDB/decoder/current/v2/decode_v2.c @@ -111,8 +111,8 @@ void *RAI_RDBLoadModel_v2(RedisModuleIO *io) { .batchsize = batchsize, .minbatchsize = minbatchsize, .minbatchtimeout = minbatchtimeout, - .backends_intra_op_parallelism = getBackendsIntraOpParallelism(), - .backends_inter_op_parallelism = getBackendsInterOpParallelism(), + .backends_intra_op_parallelism = Config_GetBackendsIntraOpParallelism(), + .backends_inter_op_parallelism = Config_GetBackendsInterOpParallelism(), }; size_t len = RedisModule_LoadUnsigned(io); diff --git a/src/serialization/RDB/decoder/previous/v0/decode_v0.c b/src/serialization/RDB/decoder/previous/v0/decode_v0.c index 23c8a7e85..58cab96e5 100644 --- a/src/serialization/RDB/decoder/previous/v0/decode_v0.c +++ b/src/serialization/RDB/decoder/previous/v0/decode_v0.c @@ -105,8 +105,8 @@ void *RAI_RDBLoadModel_v0(RedisModuleIO *io) { RAI_ModelOpts opts = { .batchsize = batchsize, .minbatchsize = minbatchsize, - .backends_intra_op_parallelism = getBackendsIntraOpParallelism(), - .backends_inter_op_parallelism = getBackendsInterOpParallelism(), + .backends_intra_op_parallelism = Config_GetBackendsIntraOpParallelism(), + .backends_inter_op_parallelism = Config_GetBackendsInterOpParallelism(), }; buffer = RedisModule_LoadStringBuffer(io, &len); diff --git a/src/serialization/RDB/decoder/previous/v1/decode_v1.c b/src/serialization/RDB/decoder/previous/v1/decode_v1.c index 7b070f7c1..461d0cf3c 100644 --- a/src/serialization/RDB/decoder/previous/v1/decode_v1.c +++ b/src/serialization/RDB/decoder/previous/v1/decode_v1.c @@ -109,8 +109,8 @@ void *RAI_RDBLoadModel_v1(RedisModuleIO *io) { RAI_ModelOpts opts = { .batchsize = batchsize, .minbatchsize = minbatchsize, - .backends_intra_op_parallelism = getBackendsIntraOpParallelism(), - .backends_inter_op_parallelism = getBackendsInterOpParallelism(), + .backends_intra_op_parallelism = Config_GetBackendsIntraOpParallelism(), + .backends_inter_op_parallelism = Config_GetBackendsInterOpParallelism(), }; size_t len = RedisModule_LoadUnsigned(io); diff --git a/src/serialization/RDB/encoder/v2/encode_v2.c b/src/serialization/RDB/encoder/v2/encode_v2.c index bb0782635..f2e34ca2a 100644 --- a/src/serialization/RDB/encoder/v2/encode_v2.c +++ b/src/serialization/RDB/encoder/v2/encode_v2.c @@ -55,7 +55,7 @@ void RAI_RDBSaveModel_v2(RedisModuleIO *io, void *value) { for (size_t i = 0; i < model->noutputs; i++) { RedisModule_SaveStringBuffer(io, model->outputs[i], strlen(model->outputs[i]) + 1); } - long long chunk_size = getModelChunkSize(); + long long chunk_size = Config_GetModelChunkSize(); const size_t n_chunks = len / chunk_size + 1; RedisModule_SaveUnsigned(io, len); RedisModule_SaveUnsigned(io, n_chunks); diff --git a/src/util/queue.c b/src/util/queue.c index 3c22488e0..e2dffdad9 100644 --- a/src/util/queue.c +++ b/src/util/queue.c @@ -90,7 +90,7 @@ queueItem *queueEvict(queue *queue, queueItem *item) { return item; } -long long queueLength(queue *queue) { return queue->len; } +unsigned long queueLength(queue *queue) { return queue->len; } void queueRelease(queue *queue) { unsigned long len; diff --git a/src/util/queue.h b/src/util/queue.h index af755181d..96aba4154 100644 --- a/src/util/queue.h +++ b/src/util/queue.h @@ -28,5 +28,5 @@ queueItem *queuePop(queue *queue); queueItem *queueFront(queue *queue); queueItem *queueNext(queueItem *item); queueItem *queueEvict(queue *queue, queueItem *item); -long long queueLength(queue *queue); +unsigned long queueLength(queue *queue); void queueRelease(queue *queue); diff --git a/src/util/string_utils.c b/src/util/string_utils.c index 6b567c3ed..49be828e4 100644 --- a/src/util/string_utils.c +++ b/src/util/string_utils.c @@ -1,6 +1,7 @@ #include "string_utils.h" #include "dict.h" #include +#include #include "util/redisai_memory.h" RedisModuleString *RAI_HoldString(RedisModuleString *str) { @@ -52,3 +53,11 @@ void RAI_RStringsKeyDestructor(void *privdata, void *key) { void *RAI_RStringsKeyDup(void *privdata, const void *key) { return RedisModule_CreateStringFromString(NULL, (RedisModuleString *)key); } + +void RAI_StringToUpper(const char *str, char *upper, size_t str_len) { + // Assumption: upper buffer size is at least str_len. This can be used for + // every binary string, we do not assume that the string is null-terminated. + for (size_t i = 0; i < str_len; i++) { + upper[i] = (char)toupper(str[i]); + } +} diff --git a/src/util/string_utils.h b/src/util/string_utils.h index 835fc45e6..9ba143e89 100644 --- a/src/util/string_utils.h +++ b/src/util/string_utils.h @@ -2,6 +2,7 @@ #include "dict.h" RedisModuleString *RAI_HoldString(RedisModuleString *str); +void RAI_StringToUpper(const char *str, char *upper, size_t str_len); uint64_t RAI_StringsHashFunction(const void *key); int RAI_StringsKeyCompare(void *privdata, const void *key1, const void *key2); diff --git a/tests/flow/includes.py b/tests/flow/includes.py index bcbdf3219..6ca9bfe0a 100755 --- a/tests/flow/includes.py +++ b/tests/flow/includes.py @@ -202,6 +202,7 @@ def check_error_message(env, con, error_msg, *command): env.assertEqual(type(exception), redis.exceptions.ResponseError) env.assertEqual(error_msg, str(exception)) + def check_error(env, con, *command): try: con.execute_command(*command) @@ -209,3 +210,11 @@ def check_error(env, con, *command): except Exception as e: exception = e env.assertEqual(type(exception), redis.exceptions.ResponseError) + + +# Returns a dict with all the fields of a certain section from INFO MODULES command +def get_info_section(con, section): + sections = ['ai_versions', 'ai_git', 'ai_load_time_configs', 'ai_backends_info', 'ai_cpu'] + section_ind = [i for i in range(len(sections)) if sections[i] == 'ai_'+section][0] + return {k.split(":")[0]: k.split(":")[1] + for k in con.execute_command("INFO MODULES").decode().split("#")[section_ind+2].split()[1:]} diff --git a/tests/flow/test_data/model_with_infinite_loop.onnx b/tests/flow/test_data/model_with_infinite_loop.onnx new file mode 100644 index 000000000..a4475545e --- /dev/null +++ b/tests/flow/test_data/model_with_infinite_loop.onnx @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:85848c4e1f96f47b62a178d67a6785b3734a5387c6f42064673d794430290862 +size 692 diff --git a/tests/flow/tests_common.py b/tests/flow/tests_common.py index 9ace58f08..e1a7b6481 100644 --- a/tests/flow/tests_common.py +++ b/tests/flow/tests_common.py @@ -328,17 +328,6 @@ def test_tensorget_disconnect(env): ret = send_and_disconnect(('AI.TENSORGET', 't_FLOAT', 'META'), red) env.assertEqual(ret, None) -def test_info_modules(env): - red = env.getConnection() - ret = red.execute_command('INFO','MODULES') - env.assertEqual( ret['ai_threads_per_queue'], 1 ) - # minimum cpu properties - env.assertEqual( 'ai_self_used_cpu_sys' in ret, True ) - env.assertEqual( 'ai_self_used_cpu_user' in ret, True ) - env.assertEqual( 'ai_children_used_cpu_sys' in ret, True ) - env.assertEqual( 'ai_children_used_cpu_user' in ret, True ) - env.assertEqual( 'ai_queue_CPU_bthread_n1_used_cpu_total' in ret, True ) - def test_lua_multi(env): con = env.getConnection() @@ -360,7 +349,20 @@ def test_lua_multi(env): env.assertEqual(type(exception), redis.exceptions.ResponseError) env.assertEqual("Cannot run RedisAI command within a transaction or a LUA script", exception.__str__()) -def test_info(env): + +def test_info_command(env): con = env.getConnection() - ret = con.execute_command('AI.INFO') - env.assertEqual(6, len(ret)) + versions = get_info_section(con, 'versions') + env.assertEqual(list(versions.keys()), ['ai_RedisAI_version', 'ai_low_level_API_version', 'ai_rdb_version']) + git = get_info_section(con, 'git') + env.assertEqual(list(git.keys()), ['ai_git_sha']) + load_time_configs = get_info_section(con, 'load_time_configs') + env.assertEqual(list(load_time_configs.keys()), ['ai_threads_per_queue', 'ai_inter_op_parallelism', + 'ai_intra_op_parallelism', 'ai_model_execution_timeout']) + # minimum cpu properties + cpu = get_info_section(con, 'cpu') + env.assertTrue('ai_self_used_cpu_sys' in cpu.keys()) + env.assertTrue('ai_self_used_cpu_user' in cpu.keys()) + env.assertTrue('ai_children_used_cpu_sys' in cpu.keys()) + env.assertTrue('ai_children_used_cpu_user' in cpu.keys()) + env.assertTrue('ai_queue_CPU_bthread_n1_used_cpu_total' in cpu.keys()) diff --git a/tests/flow/tests_gears_llapi.py b/tests/flow/tests_gears_llapi.py index fd08dab59..7db9c9bc0 100644 --- a/tests/flow/tests_gears_llapi.py +++ b/tests/flow/tests_gears_llapi.py @@ -421,3 +421,67 @@ def FlattenTensor(record): env.assertEqual(ret, b'OK') ret = con.execute_command('rg.trigger', 'FlattenTensor_test') env.assertEqual(ret[0], b'test_OK') + + +class TestExecuteOnnxModel: + + def __init__(self): + self.env = Env() + if not verify_gears_loaded(self.env): + self.env.skip() + return + script = ''' + +import redisAI + +def OnnxModelRunSync(record): + input_tensor = redisAI.getTensorFromKey('mnist_input{1}') + modelRunner = redisAI.createModelRunner('mnist{1}') + redisAI.modelRunnerAddInput(modelRunner, 'input_name', input_tensor) + redisAI.modelRunnerAddOutput(modelRunner, 'output_name') + try: + res = redisAI.modelRunnerRun(modelRunner) + except Exception as e: + raise e + +async def OnnxModelRunAsync(record): + input_tensor = redisAI.getTensorFromKey('mnist_input{1}') + modelRunner = redisAI.createModelRunner('mnist{1}') + redisAI.modelRunnerAddInput(modelRunner, 'input_name', input_tensor) + redisAI.modelRunnerAddOutput(modelRunner, 'output_name') + res = await redisAI.modelRunnerRunAsync(modelRunner) + redisAI.setTensorInKey('mnist_output{1}', res[0]) + return "OnnxModelRun_OK" + +GB("CommandReader").map(OnnxModelRunSync).register(trigger="OnnxModelRunSync_test1") +GB("CommandReader").map(OnnxModelRunAsync).register(trigger="OnnxModelRunAsync_test2") + ''' + + con = self.env.getConnection() + ret = con.execute_command('rg.pyexecute', script) + self.env.assertEqual(ret, b'OK') + + # Load onnx model and its input. + model_pb = load_file_content('mnist.onnx') + sample_raw = load_file_content('one.raw') + ret = con.execute_command('AI.MODELSTORE', 'mnist{1}', 'ONNX', DEVICE, 'BLOB', model_pb) + self.env.assertEqual(ret, b'OK') + con.execute_command('AI.TENSORSET', 'mnist_input{1}', 'FLOAT', 1, 1, 28, 28, 'BLOB', sample_raw) + + def test_sync_run_error(self): + con = self.env.getConnection() + try: + con.execute_command('rg.trigger', 'OnnxModelRunSync_test1') + self.env.assertFalse(True) + except Exception as exception: + self.env.assertEqual(type(exception), redis.exceptions.ResponseError) + self.env.assertTrue(str(exception).find("Cannot execute onnxruntime model synchronously, " + "use async execution instead") >= 0) + + def test_async_run(self): + con = self.env.getConnection() + ret = con.execute_command('rg.trigger', 'OnnxModelRunAsync_test2') + self.env.assertEqual(ret[0], b'OnnxModelRun_OK') + values = con.execute_command('AI.TENSORGET', 'mnist_output{1}', 'VALUES') + argmax = max(range(len(values)), key=lambda i: values[i]) + self.env.assertEqual(argmax, 1) diff --git a/tests/flow/tests_onnx.py b/tests/flow/tests_onnx.py index 743bfd485..027b1cae1 100644 --- a/tests/flow/tests_onnx.py +++ b/tests/flow/tests_onnx.py @@ -296,16 +296,14 @@ def tests_onnx_info(env): return con = env.getConnection() - ret = con.execute_command('AI.INFO') - env.assertEqual(6, len(ret)) + backends_info = get_info_section(con, 'backends_info') + env.assertFalse('ai_onnxruntime_version' in backends_info) linear_model = load_file_content('linear_iris.onnx') - con.execute_command('AI.MODELSTORE', 'linear{1}', 'ONNX', DEVICE, 'BLOB', linear_model) - - ret = con.execute_command('AI.INFO') - env.assertEqual(8, len(ret)) - env.assertEqual(b'ONNX version', ret[6]) + + backends_info = get_info_section(con, 'backends_info') + env.assertTrue('ai_onnxruntime_version' in backends_info) def test_parallelism(): @@ -328,14 +326,12 @@ def test_parallelism(): argmax = max(range(len(values)), key=lambda i: values[i]) env.assertEqual(argmax, 1) - load_time_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[3].split()[1:]} + load_time_config = get_info_section(con, 'load_time_configs') env.assertEqual(load_time_config["ai_inter_op_parallelism"], "1") env.assertEqual(load_time_config["ai_intra_op_parallelism"], "1") env = Env(moduleArgs='INTRA_OP_PARALLELISM 2 INTER_OP_PARALLELISM 2') - load_time_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[3].split()[1:]} + load_time_config = get_info_section(con, 'load_time_configs') env.assertEqual(load_time_config["ai_inter_op_parallelism"], "2") env.assertEqual(load_time_config["ai_intra_op_parallelism"], "2") @@ -348,10 +344,6 @@ def test_onnx_use_custom_allocator(env): con = env.getConnection() model_pb = load_file_content('mul_1.onnx') - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory"]), 0) - # Expect using the allocator during model set for allocating the model, its input name and output name: # overall 3 allocations. The model raw size is 130B ,and the names are 2B each. In practice we allocate # more than 134B as Redis allocator will use additional memory for its internal management and for the @@ -359,13 +351,12 @@ def test_onnx_use_custom_allocator(env): # (hence will not use additional memory). ret = con.execute_command('AI.MODELSTORE', 'm{1}', 'ONNX', 'CPU', 'BLOB', model_pb) env.assertEqual(ret, b'OK') - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} + backends_info = get_info_section(con, 'backends_info') # Expect using at least 130+63+(size of an address) + 2*(2+63+(size of an address)) bytes. - model_allocation_bytes_used = int(ai_memory_config["ai_onnxruntime_memory"]) + model_allocation_bytes_used = int(backends_info["ai_onnxruntime_memory"]) env.assertTrue(model_allocation_bytes_used > 334) - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory_access_num"]), 3) + env.assertEqual(int(backends_info["ai_onnxruntime_memory_access_num"]), 3) con.execute_command('AI.TENSORSET', 'a_mul{1}', 'FLOAT', 3, 2, 'VALUES', 1.0, 2.0, 3.0, 4.0, 5.0, 6.0) # Running the model should access the allocator 6 times: allocating+freeing input+output names, @@ -373,18 +364,16 @@ def test_onnx_use_custom_allocator(env): con.execute_command('AI.MODELEXECUTE', 'm{1}', 'INPUTS', 1, 'a_mul{1}', 'OUTPUTS', 1, 'b{1}') values = con.execute_command('AI.TENSORGET', 'b{1}', 'VALUES') env.assertEqual(values, [b'1', b'4', b'9', b'16', b'25', b'36']) - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory_access_num"]), 9) - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory"]), model_allocation_bytes_used) + backends_info = get_info_section(con, 'backends_info') + env.assertEqual(int(backends_info["ai_onnxruntime_memory_access_num"]), 9) + env.assertEqual(int(backends_info["ai_onnxruntime_memory"]), model_allocation_bytes_used) # Expect using the allocator free function 3 times: when releasing the model, input name and output name. con.execute_command('AI.MODELDEL', 'm{1}') env.assertFalse(con.execute_command('EXISTS', 'm{1}')) - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory"]), 0) - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory_access_num"]), 12) + backends_info = get_info_section(con, 'backends_info') + env.assertEqual(int(backends_info["ai_onnxruntime_memory"]), 0) + env.assertEqual(int(backends_info["ai_onnxruntime_memory_access_num"]), 12) # test the use of Redis allocator in model run op. model_pb = load_file_content('mnist.onnx') @@ -396,13 +385,11 @@ def test_onnx_use_custom_allocator(env): # Expect 18 allocator's access from onnx during the run (in addition to the allocations that were made while # creating the model). - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - allocator_access_num_before = ai_memory_config["ai_onnxruntime_memory_access_num"] + backends_info = get_info_section(con, 'backends_info') + allocator_access_num_before = backends_info["ai_onnxruntime_memory_access_num"] con.execute_command('AI.MODELEXECUTE', 'm{1}', 'INPUTS', 1, 'a{1}', 'OUTPUTS', 1, 'b{1}') - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - allocator_access_num_after = ai_memory_config["ai_onnxruntime_memory_access_num"] + backends_info = get_info_section(con, 'backends_info') + allocator_access_num_after = backends_info["ai_onnxruntime_memory_access_num"] env.assertEqual(int(allocator_access_num_after) - int(allocator_access_num_before), 18) values = con.execute_command('AI.TENSORGET', 'b{1}', 'VALUES') @@ -420,9 +407,6 @@ def test_onnx_use_custom_allocator_with_GPU(env): con = env.getConnection() model_pb = load_file_content('mul_1.onnx') - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory"]), 0) # Expect using the allocator during model set for allocating the model, its input name and output name: # overall 3 allocations. The model raw size is 130B ,and the names are 2B each. In practice we allocate @@ -434,14 +418,13 @@ def test_onnx_use_custom_allocator_with_GPU(env): # but for GPU, expect using the allocator only for allocating input and output names (not the model itself). ret = con.execute_command('AI.MODELSTORE', 'm_cpu{1}', 'ONNX', 'CPU', 'BLOB', model_pb) env.assertEqual(ret, b'OK') - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} + backends_info = get_info_section(con, 'backends_info') # Expect using at least 130+63+(size of an address) + 4*(2+63+(size of an address)) bytes. - model_allocation_bytes_used = int(ai_memory_config["ai_onnxruntime_memory"]) + model_allocation_bytes_used = int(backends_info["ai_onnxruntime_memory"]) env.assertTrue(model_allocation_bytes_used > 472) env.assertTrue(model_allocation_bytes_used < 705) - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory_access_num"]), 5) + env.assertEqual(int(backends_info["ai_onnxruntime_memory_access_num"]), 5) # Make sure that allocator is not used for running and freeing the GPU model, except for # the input and output names allocations (and deallocations). @@ -451,15 +434,84 @@ def test_onnx_use_custom_allocator_with_GPU(env): env.assertEqual(values, [b'1', b'4', b'9', b'16', b'25', b'36']) # Expect that memory usage didn't change, and for another 4 accesses to the allocator (input and output names # allocation and free) - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory"]), model_allocation_bytes_used) - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory_access_num"]), 9) + backends_info = get_info_section(con, 'backends_info') + env.assertEqual(int(backends_info["ai_onnxruntime_memory"]), model_allocation_bytes_used) + env.assertEqual(int(backends_info["ai_onnxruntime_memory_access_num"]), 9) # Expect only 2 more accesses in delete - for deallocating input and output names con.execute_command('AI.MODELDEL', 'm_gpu{1}') env.assertFalse(con.execute_command('EXISTS', 'm_gpu{1}')) - ai_memory_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[4].split()[1:]} - env.assertEqual(int(ai_memory_config["ai_onnxruntime_memory_access_num"]), 11) - + backends_info = get_info_section(con, 'backends_info') + env.assertEqual(int(backends_info["ai_onnxruntime_memory_access_num"]), 11) + + +class TestOnnxKillSwitch: + + def __init__(self): + self.threads_per_queue = 8 + self.env = Env(moduleArgs='THREADS_PER_QUEUE '+str(self.threads_per_queue)+' MODEL_EXECUTION_TIMEOUT 1000') + con = self.env.getConnection() + model_with_inf_loop = load_file_content("model_with_infinite_loop.onnx") + ret = con.execute_command('AI.MODELSTORE', 'inf_loop_model{1}', 'ONNX', DEVICE, 'BLOB', model_with_inf_loop) + self.env.assertEqual(ret, b'OK') + + # Set tensors according to the model inputs. This model consists of two operations to type 'Identity' + # (i.e., just output the input), where the second op is wrapped with another op of type 'Loop'. Overall, this + # model runs a very large number of iterations without doing anything, until it is caught with the kill switch. + con.execute_command('AI.TENSORSET', 'iterations{1}', 'INT64', 1, 'VALUES', 9223372036854775807) + con.execute_command('AI.TENSORSET', 'loop_cond{1}', 'BOOL', 1, 'VALUES', 1) + con.execute_command('AI.TENSORSET', 'loop_input{1}', 'FLOAT', 1, 'VALUES', 42) + con.execute_command('AI.TENSORSET', 'outer_scope_input{1}', 'FLOAT', 1, 'VALUES', 42) + + def test_basic(self): + try: + con = self.env.getConnection() + con.execute_command('AI.MODELEXECUTE', 'inf_loop_model{1}', 'INPUTS', 4, 'outer_scope_input{1}', 'iterations{1}', + 'loop_cond{1}', 'loop_input{1}', 'OUTPUTS', 2, 'outer_scope_output{1}', 'loop_output{1}') + self.env.assertTrue(False) + except Exception as exception: + self.env.assertEqual(type(exception), redis.exceptions.ResponseError) + self.env.assertTrue(str(exception).find("Exiting due to terminate flag being set to true") != -1) + + def test_multiple_working_threads(self): + con = self.env.getConnection() + + # Load another onnx model that will be executed on the same threads that use the kill switch + model_pb = load_file_content('mnist.onnx') + sample_raw = load_file_content('one.raw') + ret = con.execute_command('AI.MODELSTORE', 'mnist{1}', 'ONNX', DEVICE, 'BLOB', model_pb) + self.env.assertEqual(ret, b'OK') + con.execute_command('AI.TENSORSET', 'a{1}', 'FLOAT', 1, 1, 28, 28, 'BLOB', sample_raw) + + def run_parallel_onnx_sessions(con): + ret = con.execute_command('AI.MODELEXECUTE', 'mnist{1}', 'INPUTS', 1, 'a{1}', 'OUTPUTS', 1, 'b{1}') + self.env.assertEqual(ret, b'OK') + try: + con.execute_command('AI.MODELEXECUTE', 'inf_loop_model{1}', 'INPUTS', 4, 'outer_scope_input{1}', 'iterations{1}', + 'loop_cond{1}', 'loop_input{1}', 'OUTPUTS', 2, 'outer_scope_output{1}', 'loop_output{1}') + except Exception as exception: + self.env.assertEqual(type(exception), redis.exceptions.ResponseError) + self.env.assertTrue(str(exception).find("Exiting due to terminate flag being set to true") != -1) + ret = con.execute_command('AI.MODELEXECUTE', 'mnist{1}', 'INPUTS', 1, 'a{1}', 'OUTPUTS', 1, 'b{1}') + self.env.assertEqual(ret, b'OK') + run_test_multiproc(self.env, 8, run_parallel_onnx_sessions) + + def test_multiple_devices(self): + con = self.env.getConnection() + # CPU run queue is created from the start, so if we used a device different than CPU, we should + # have maximum of 2*THREADS_PER_QUEUE run sessions, and otherwise we should have THREADS_PER_QUEUE. + devices = {'CPU', DEVICE} + backends_info = get_info_section(con, 'backends_info') + self.env.assertEqual(backends_info['ai_onnxruntime_maximum_run_sessions_number'], + str(len(devices)*self.threads_per_queue)) + + # Load another onnx model as if it runs on a different device (to test existence of multiple queues, and + # the extension of the global onnx run sessions array as a consequence.) + model_pb = load_file_content('mnist.onnx') + ret = con.execute_command('AI.MODELSTORE', 'mnist_{1}', 'ONNX', 'CPU:1', 'BLOB', model_pb) + self.env.assertEqual(ret, b'OK') + devices.add('CPU:1') + + backends_info = get_info_section(con, 'backends_info') + self.env.assertEqual(backends_info['ai_onnxruntime_maximum_run_sessions_number'], + str(len(devices)*self.threads_per_queue)) diff --git a/tests/flow/tests_pytorch.py b/tests/flow/tests_pytorch.py index 92118b89a..d19347204 100644 --- a/tests/flow/tests_pytorch.py +++ b/tests/flow/tests_pytorch.py @@ -739,14 +739,12 @@ def test_parallelism(): ensureSlaveSynced(con, env) values = con.execute_command('AI.TENSORGET', 'c{1}', 'VALUES') env.assertEqual(values, [b'4', b'6', b'4', b'6']) - load_time_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[3].split()[1:]} + load_time_config = get_info_section(con, 'load_time_configs') env.assertEqual(load_time_config["ai_inter_op_parallelism"], "1") env.assertEqual(load_time_config["ai_intra_op_parallelism"], "1") env = Env(moduleArgs='INTRA_OP_PARALLELISM 2 INTER_OP_PARALLELISM 2') - load_time_config = {k.split(":")[0]: k.split(":")[1] - for k in con.execute_command("INFO MODULES").decode().split("#")[3].split()[1:]} + load_time_config = get_info_section(con, 'load_time_configs') env.assertEqual(load_time_config["ai_inter_op_parallelism"], "2") env.assertEqual(load_time_config["ai_intra_op_parallelism"], "2") @@ -777,12 +775,11 @@ def test_torch_info(env): return con = env.getConnection() - ret = con.execute_command('AI.INFO') - env.assertEqual(6, len(ret)) + backends_info = get_info_section(con, 'backends_info') + env.assertFalse('ai_Torch_version' in backends_info) model_pb = load_file_content('pt-minimal-bb.pt') ret = con.execute_command('AI.MODELSTORE', 'm{1}', 'TORCH', DEVICE, 'BLOB', model_pb) - ret = con.execute_command('AI.INFO') - env.assertEqual(8, len(ret)) - env.assertEqual(b'Torch version', ret[6]) + backends_info = get_info_section(con, 'backends_info') + env.assertTrue('ai_Torch_version' in backends_info) diff --git a/tests/flow/tests_tensorflow.py b/tests/flow/tests_tensorflow.py index d9d6581bd..ed3d268d4 100644 --- a/tests/flow/tests_tensorflow.py +++ b/tests/flow/tests_tensorflow.py @@ -691,14 +691,11 @@ def test_tensorflow_modelexecute_script_execute_resnet(env): def test_tf_info(env): con = env.getConnection() - ret = con.execute_command('AI.INFO') - env.assertEqual(6, len(ret)) - + backends_info = get_info_section(con, 'backends_info') + env.assertFalse('ai_TensorFlow_version' in backends_info) model_pb = load_file_content('graph.pb') - con.execute_command('AI.MODELSTORE', 'm{1}', 'TF', DEVICE, 'INPUTS', 2, 'a', 'b', 'OUTPUTS', 1, 'mul', 'BLOB', model_pb) - - ret = con.execute_command('AI.INFO') - env.assertEqual(8, len(ret)) - env.assertEqual(b'TensorFlow version', ret[6]) + + backends_info = get_info_section(con, 'backends_info') + env.assertTrue('ai_TensorFlow_version' in backends_info) diff --git a/tests/flow/tests_tflite.py b/tests/flow/tests_tflite.py index 9db8be2e9..67a78ab7a 100644 --- a/tests/flow/tests_tflite.py +++ b/tests/flow/tests_tflite.py @@ -191,13 +191,11 @@ def test_tflite_info(env): return con = env.getConnection() - ret = con.execute_command('AI.INFO') - env.assertEqual(6, len(ret)) + backends_info = get_info_section(con, 'backends_info') + env.assertFalse('ai_TensorFlowLite_version' in backends_info) model_pb = load_file_content('mnist_model_quant.tflite') - con.execute_command('AI.MODELSTORE', 'mnist{1}', 'TFLITE', 'CPU', 'BLOB', model_pb) - ret = con.execute_command('AI.INFO') - env.assertEqual(8, len(ret)) - env.assertEqual(b'TFLite version', ret[6]) + backends_info = get_info_section(con, 'backends_info') + env.assertTrue('ai_TensorFlowLite_version' in backends_info)