Skip to content

Enable AI.SCRIPTRUN on AI.DAGRUN* #383

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 12 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ _Arguments_
* `AI.TENSORSET`
* `AI.TENSORGET`
* `AI.MODELRUN`
* `AI.SCRIPTRUN`

_Return_

Expand All @@ -574,6 +575,30 @@ redis> AI.DAGRUN PERSIST 1 predictions |>
3) "\x00\x00\x80?\x00\x00\x00@\x00\x00@@\x00\x00\x80@"
```

A common pattern is enqueuing multiple SCRIPTRUN and MODELRUN commands within a DAG. The following example uses ResNet-50,to classify images into 1000 object categories. Given that our input tensor contains each color represented as a 8-bit integer and that neural networks usually work with floating-point tensors as their input we need to cast a tensor to floating-point and normalize the values of the pixels - for that we will use `pre_process_3ch` function.

To optimize the classification process we can use a post process script to return only the category position with the maximum classification - for that we will use `post_process` script. Using the DAG capabilities we've removed the necessity of storing the intermediate tensors in the keyspace. You can even run the entire process without storing the output tensor, as follows:

```
redis> AI.DAGRUN_RO |>
AI.TENSORSET image UINT8 224 224 3 BLOB b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00....' |>
AI.SCRIPTRUN imagenet_script pre_process_3ch INPUTS image OUTPUTS temp_key1 |>
AI.MODELRUN imagenet_model INPUTS temp_key1 OUTPUTS temp_key2 |>
AI.SCRIPTRUN imagenet_script post_process INPUTS temp_key2 OUTPUTS output |>
AI.TENSORGET output VALUES
1) OK
2) OK
3) OK
4) OK
5) 1) 1) (integer) 111
```

As visible on the array reply, the label position with higher classification was 111.

By combining DAG with multiple SCRIPTRUN and MODELRUN commands we've substantially removed the overall required bandwith and network RX ( we're now returning a tensor with 1000 times less elements per classification ).



!!! warning "Intermediate memory overhead"
The execution of models and scripts within the DAG may generate intermediate tensors that are not allocated by the Redis allocator, but by whatever allocator is used in the backends (which may act on main memory or GPU memory, depending on the device), thus not being limited by `maxmemory` configuration settings of Redis.

Expand Down
206 changes: 206 additions & 0 deletions src/dag.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,38 @@ void *RedisAI_DagRunSession(RedisAI_RunInfo *rinfo) {
}
break;
}
case REDISAI_DAG_CMD_SCRIPTRUN: {
const int parse_result = RedisAI_Parse_ScriptRun_RedisCommand(
NULL, currentOp->argv, currentOp->argc, &(currentOp->sctx),
&(currentOp->outkeys), &(currentOp->sctx->script), 1,
&(rinfo->dagTensorsContext), 0, NULL, currentOp->err);

if (parse_result > 0) {
currentOp->result = REDISMODULE_OK;
const long long start = ustime();
currentOp->result = RAI_ScriptRun(currentOp->sctx, currentOp->err);
currentOp->duration_us = ustime() - start;
const size_t noutputs = RAI_ScriptRunCtxNumOutputs(currentOp->sctx);
for (size_t outputNumber = 0; outputNumber < noutputs;
outputNumber++) {
RAI_Tensor *tensor =
RAI_ScriptRunCtxOutputTensor(currentOp->sctx, outputNumber);
if (tensor) {
const char *key_string = RedisModule_StringPtrLen(
currentOp->outkeys[outputNumber], NULL);
const char *dictKey = RedisModule_Strdup(key_string);
AI_dictReplace(rinfo->dagTensorsContext, (void*)dictKey, tensor);
} else {
RAI_SetError(currentOp->err, RAI_EMODELRUN,
"ERR output tensor on DAG's SCRIPTRUN was null");
currentOp->result = REDISMODULE_ERR;
}
}
} else {
currentOp->result = REDISMODULE_ERR;
}
break;
}
default: {
/* unsupported DAG's command */
RAI_SetError(currentOp->err, RAI_EDAGRUN,
Expand Down Expand Up @@ -163,6 +195,22 @@ int RedisAI_DagRun_Reply(RedisModuleCtx *ctx, RedisModuleString **argv,
break;
}

case REDISAI_DAG_CMD_SCRIPTRUN: {
rinfo->dagReplyLength++;
struct RedisAI_RunStats *rstats = NULL;
const char *runkey =
RedisModule_StringPtrLen(currentOp->runkey, NULL);
RAI_GetRunStats(runkey,&rstats);
if (currentOp->result == REDISMODULE_ERR) {
RAI_SafeAddDataPoint(rstats,0,1,1,0);
RedisModule_ReplyWithError(ctx, currentOp->err->detail_oneline);
} else {
RAI_SafeAddDataPoint(rstats,currentOp->duration_us,1,0,0);
RedisModule_ReplyWithSimpleString(ctx, "OK");
}
break;
}

default:
/* no-op */
break;
Expand Down Expand Up @@ -325,3 +373,161 @@ int RAI_parseDAGPersistArgs(RedisModuleCtx *ctx, RedisModuleString **argv,
}
return argpos;
}

int RedisAI_DagRunSyntaxParser(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc, int dagMode) {
if (argc < 4) return RedisModule_WrongArity(ctx);

RedisAI_RunInfo *rinfo = NULL;
if (RAI_InitRunInfo(&rinfo) == REDISMODULE_ERR) {
return RedisModule_ReplyWithError(
ctx,
"ERR Unable to allocate the memory and initialise the RedisAI_RunInfo "
"structure");
}
rinfo->use_local_context = 1;
RAI_DagOp *currentDagOp = NULL;
RAI_InitDagOp(&currentDagOp);
array_append(rinfo->dagOps, currentDagOp);

int persistFlag = 0;
int loadFlag = 0;
int chainingOpCount = 0;
const char *deviceStr = NULL;

for (size_t argpos = 1; argpos <= argc - 1; argpos++) {
const char *arg_string = RedisModule_StringPtrLen(argv[argpos], NULL);
if (!strcasecmp(arg_string, "LOAD")) {
loadFlag = 1;
const int parse_result = RAI_parseDAGLoadArgs(
ctx, &argv[argpos], argc - argpos, &(rinfo->dagTensorsLoadedContext),
&(rinfo->dagTensorsContext), "|>");
if (parse_result > 0) {
argpos += parse_result - 1;
} else {
RAI_FreeRunInfo(ctx, rinfo);
return REDISMODULE_ERR;
}
} else if (!strcasecmp(arg_string, "PERSIST")) {
if (dagMode == REDISAI_DAG_READONLY_MODE) {
RAI_FreeRunInfo(ctx, rinfo);
return RedisModule_ReplyWithError(
ctx, "ERR PERSIST cannot be specified in a read-only DAG");
}
persistFlag = 1;
const int parse_result =
RAI_parseDAGPersistArgs(ctx, &argv[argpos], argc - argpos,
&(rinfo->dagTensorsPersistentContext), "|>");
if (parse_result > 0) {
argpos += parse_result - 1;
} else {
RAI_FreeRunInfo(ctx, rinfo);
return REDISMODULE_ERR;
}
} else if (!strcasecmp(arg_string, "|>")) {
// on the first pipe operator, if LOAD or PERSIST were used, we've already
// allocated memory
if (!((persistFlag == 1 || loadFlag == 1) && chainingOpCount == 0)) {
rinfo->dagNumberCommands++;
RAI_DagOp *currentDagOp = NULL;
RAI_InitDagOp(&currentDagOp);
array_append(rinfo->dagOps, currentDagOp);
}
chainingOpCount++;
} else {
if (!strcasecmp(arg_string, "AI.TENSORGET")) {
rinfo->dagOps[rinfo->dagNumberCommands]->commandType =
REDISAI_DAG_CMD_TENSORGET;
}
if (!strcasecmp(arg_string, "AI.TENSORSET")) {
rinfo->dagOps[rinfo->dagNumberCommands]->commandType =
REDISAI_DAG_CMD_TENSORSET;
}
if (!strcasecmp(arg_string, "AI.MODELRUN")) {
if (argc - 2 < argpos) {
return RedisModule_WrongArity(ctx);
}
rinfo->dagOps[rinfo->dagNumberCommands]->commandType =
REDISAI_DAG_CMD_MODELRUN;
RAI_Model *mto;
RedisModuleKey *modelKey;
const int status = RAI_GetModelFromKeyspace(
ctx, argv[argpos + 1], &modelKey, &mto, REDISMODULE_READ);
if (status == REDISMODULE_ERR) {
RAI_FreeRunInfo(ctx, rinfo);
return REDISMODULE_ERR;
}
if (deviceStr == NULL) {
deviceStr = mto->devicestr;
} else {
// If the device strings are not equivalent, reply with error ( for
// now )
if (strcasecmp(mto->devicestr, deviceStr) != 0) {
RAI_FreeRunInfo(ctx, rinfo);
return RedisModule_ReplyWithError(
ctx, "ERR multi-device DAGs not supported yet");
}
}
rinfo->dagOps[rinfo->dagNumberCommands]->runkey = argv[argpos + 1];
rinfo->dagOps[rinfo->dagNumberCommands]->mctx =
RAI_ModelRunCtxCreate(mto);
}
if (!strcasecmp(arg_string, "AI.SCRIPTRUN")) {
if (argc - 3 < argpos) {
return RedisModule_WrongArity(ctx);
}
rinfo->dagOps[rinfo->dagNumberCommands]->commandType =
REDISAI_DAG_CMD_SCRIPTRUN;
RAI_Script *sto;
RedisModuleKey *scriptKey;
const int status = RAI_GetScriptFromKeyspace(
ctx, argv[argpos + 1], &scriptKey, &sto, REDISMODULE_READ);
if (status == REDISMODULE_ERR) {
RAI_FreeRunInfo(ctx, rinfo);
return REDISMODULE_ERR;
}
if (deviceStr == NULL) {
deviceStr = sto->devicestr;
} else {
// If the device strings are not equivalent, reply with error ( for
// now )
if (strcasecmp(sto->devicestr, deviceStr) != 0) {
RAI_FreeRunInfo(ctx, rinfo);
return RedisModule_ReplyWithError(
ctx, "ERR multi-device DAGs not supported yet");
}
}
const char *functionName =
RedisModule_StringPtrLen(argv[argpos + 2], NULL);
rinfo->dagOps[rinfo->dagNumberCommands]->runkey = argv[argpos + 1];
rinfo->dagOps[rinfo->dagNumberCommands]->sctx =
RAI_ScriptRunCtxCreate(sto, functionName);
}
RedisModule_RetainString(NULL, argv[argpos]);
array_append(rinfo->dagOps[rinfo->dagNumberCommands]->argv, argv[argpos]);
rinfo->dagOps[rinfo->dagNumberCommands]->argc++;
}
}

RunQueueInfo *run_queue_info = NULL;
// If there was no MODELRUN or SCRIPTRUN on the DAG, we default all ops to CPU
if (deviceStr == NULL) {
deviceStr = "CPU";
}
// If the queue does not exist, initialize it
if (ensureRunQueue(deviceStr, &run_queue_info) == REDISMODULE_ERR) {
RAI_FreeRunInfo(ctx, rinfo);
return RedisModule_ReplyWithError(ctx,
"ERR Queue not initialized for device");
}

rinfo->client =
RedisModule_BlockClient(ctx, RedisAI_DagRun_Reply, NULL, NULL, 0);

pthread_mutex_lock(&run_queue_info->run_queue_mutex);
queuePush(run_queue_info->run_queue, rinfo);
pthread_cond_signal(&run_queue_info->queue_condition_var);
pthread_mutex_unlock(&run_queue_info->run_queue_mutex);

return REDISMODULE_OK;
}
14 changes: 14 additions & 0 deletions src/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,18 @@ int RAI_parseDAGPersistArgs(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc, AI_dict **localContextDict,
const char *chaining_operator);

/**
* DAGRUN and DAGRUN_RO parser, which reads the the sequence of
* arguments and decides whether the sequence conforms to the syntax
* specified by the DAG grammar.
*
* @param ctx Context in which Redis modules operate
* @param argv Redis command arguments, as an array of strings
* @param argc Redis command number of arguments
* @param dagMode access mode, for now REDISAI_DAG_READONLY_MODE or REDISAI_DAG_WRITE_MODE
* @return
*/
int RedisAI_DagRunSyntaxParser(RedisModuleCtx *ctx, RedisModuleString **argv,
int argc, int dagMode);

#endif /* SRC_DAG_H_ */
6 changes: 1 addition & 5 deletions src/model.c
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,6 @@ int RedisAI_Parse_ModelRun_RedisCommand(RedisModuleCtx *ctx,
if (!strcasecmp(arg_string, "OUTPUTS") && outputs_flag_count == 0) {
is_input = 1;
outputs_flag_count = 1;
const size_t expected_noutputs = argc - argpos - 1;
// if (expected_noutputs > 0) {
// *outkeys =
// RedisModule_Calloc(expected_noutputs, sizeof(RedisModuleString *));
// }
} else {
RedisModule_RetainString(ctx, argv[argpos]);
if (is_input == 0) {
Expand Down Expand Up @@ -610,6 +605,7 @@ int RedisAI_Parse_ModelRun_RedisCommand(RedisModuleCtx *ctx,
} else {
RedisModule_ReplyWithError(ctx, "ERR Output key not found");
}
return -1;
}
*outkeys=array_append(*outkeys,argv[argpos]);
// (*outkeys)[noutputs] = argv[argpos];
Expand Down
Loading