Skip to content

Turn dag local context dict into array #582

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 10 commits into from
Feb 2, 2021
229 changes: 84 additions & 145 deletions src/DAG/dag.c

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions src/DAG/dag.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ void RedisAI_DagOpBatchInfo(RedisAI_RunInfo *rinfo, RAI_DagOp *op, size_t *batch
void RedisAI_DagOpBatchingMatch(RedisAI_RunInfo *rinfo1, RAI_DagOp *op1, RedisAI_RunInfo *rinfo2,
RAI_DagOp *op2, int *batched, size_t *inbatchsize);

/**
* @brief Get a tensor from the dag local context in a given index
* (this access to a shared array, require read lock)
* @param rinfo The DAG runInfo.
* @param index The index of the tensor in the Dag shared array to return
* @return The tensor of the given index (NULL is returned if this tensor hasn't been realized yet)
*/
RAI_Tensor *Dag_GetTensorFromGlobalCtx(RedisAI_RunInfo *rinfo, size_t index);

/**
* @brief Shallow copy and set a tensor in the dag local context in a given index.
* (this access to a shared array, require write lock)
* @param rinfo The DAG runInfo.
* @param index The index to put in the given tensor in the Dag shared array.
* @param t The tensor to shallow copy and store in the given index.
*/
void Dag_SetTensorInGlobalCtx(RedisAI_RunInfo *rinfo, size_t index, RAI_Tensor *t);

/**
* Run the first unrealized DAG operation in rinfo for the given device.
* @param rinfo context in which RedisAI blocking commands operate.
Expand Down
18 changes: 12 additions & 6 deletions src/DAG/dag_builder.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ int RAI_DAGLoadTensor(RAI_DAGRunCtx *run_info, const char *t_name, RAI_Tensor *t

RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)run_info;
RedisModuleString *key_name = RedisModule_CreateString(NULL, t_name, strlen(t_name));
// Add the tensor under its "mangled" key name to the DAG local context dict.
char buf[16];
sprintf(buf, "%04d", 1);
RedisModule_StringAppendBuffer(NULL, key_name, buf, strlen(buf));
AI_dictAdd(rinfo->dagTensorsContext, (void *)key_name,
(void *)RAI_TensorGetShallowCopy(tensor));

// Cannot load more than one tensor under the same name
if (AI_dictFind(rinfo->tensorsNamesToIndices, key_name) != NULL) {
RedisModule_FreeString(NULL, key_name);
return REDISMODULE_ERR;
}

// Add the tensor to the DAG shared tensors and map its name to the relevant index.
size_t index = array_len(rinfo->dagSharedTensors);
AI_dictAdd(rinfo->tensorsNamesToIndices, (void *)key_name, (void *)index);
RAI_TensorGetShallowCopy(tensor);
rinfo->dagSharedTensors = array_append(rinfo->dagSharedTensors, (void *)tensor);
RedisModule_FreeString(NULL, key_name);

return REDISMODULE_OK;
Expand Down
169 changes: 36 additions & 133 deletions src/DAG/dag_execute.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,155 +3,61 @@
#include "background_workers.h"
#include "util/string_utils.h"

void _DAG_SetTensorsInLocalContext(RedisAI_RunInfo *rinfo) {
for (size_t i = 0; i < rinfo->dagOpCount; i++) {
RAI_DagOp *op = rinfo->dagOps[i];
if (op->commandType == REDISAI_DAG_CMD_TENSORSET) {
// Insert the tensor with its mangled (unique) name.
void *t = (void *)RAI_TensorGetShallowCopy(op->outTensor);
AI_dictReplace(rinfo->dagTensorsContext, (void *)op->outkeys[0], t);
}
}
}

int MangleTensorsNames(RedisAI_RunInfo *rinfo) {

int res = REDISMODULE_ERR;
AI_dict *mangled_tensors = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL);
int ValidatePersistKeys(RedisAI_RunInfo *rinfo, AI_dict *tensorsNamesToInd,
AI_dict *persistTensorsNames) {

{
AI_dictIterator *iter = AI_dictGetSafeIterator(rinfo->dagTensorsContext);
AI_dictEntry *entry = AI_dictNext(iter);
while (entry) {
RedisModuleString *key = (RedisModuleString *)AI_dictGetKey(entry);
size_t key_len;
const char *key_str = RedisModule_StringPtrLen(key, &key_len);
RedisModuleString *demangled_key = RedisModule_CreateString(NULL, key_str, key_len - 4);
int *instance = RedisModule_Alloc(sizeof(int));
*instance = 1;
AI_dictAdd(mangled_tensors, (void *)demangled_key, (void *)instance);
RedisModule_FreeString(NULL, demangled_key);
entry = AI_dictNext(iter);
AI_dictIterator *iter = AI_dictGetSafeIterator(persistTensorsNames);
AI_dictEntry *persist_entry;
while ((persist_entry = AI_dictNext(iter))) {
RedisModuleString *persist_key = (RedisModuleString *)AI_dictGetKey(persist_entry);
AI_dictEntry *entry = AI_dictFind(tensorsNamesToInd, persist_key);
if (!entry) {
RAI_SetError(rinfo->err, RAI_EDAGRUN, "ERR PERSIST key cannot be found in DAG");
AI_dictReleaseIterator(iter);
return REDISMODULE_ERR;
}
size_t index = (size_t)AI_dictGetVal(entry);
AI_dictReplace(persistTensorsNames, (void *)persist_key, (void *)index);
}
AI_dictReleaseIterator(iter);
}
return REDISMODULE_OK;
}

int MapTensorsKeysToIndices(RedisAI_RunInfo *rinfo, AI_dict *tensorsNamesToInd) {

for (long long i = 0; i < array_len(rinfo->dagOps); i++) {
RAI_DagOp *currentOp = rinfo->dagOps[i];

RedisModuleString **mangled_inkeys =
array_new(RedisModuleString *, array_len(currentOp->inkeys));
for (long long j = 0; j < array_len(currentOp->inkeys); j++) {
RedisModuleString *key = currentOp->inkeys[j];
AI_dictEntry *entry = AI_dictFind(mangled_tensors, key);
AI_dictEntry *entry = AI_dictFind(tensorsNamesToInd, key);
if (!entry) {
array_free(mangled_inkeys);
RAI_SetError(rinfo->err, RAI_EDAGRUN, "ERR INPUT key cannot be found in DAG");
goto cleanup;
return REDISMODULE_ERR;
}
int *instance = AI_dictGetVal(entry);
char buf[16];
sprintf(buf, "%04d", *instance);
RedisModuleString *mangled_key = RedisModule_CreateStringFromString(NULL, key);
RedisModule_StringAppendBuffer(NULL, mangled_key, buf, strlen(buf));
mangled_inkeys = array_append(mangled_inkeys, mangled_key);
size_t ind = (size_t)AI_dictGetVal(entry);
currentOp->inkeys_indices = array_append(currentOp->inkeys_indices, ind);
}

RedisModuleString **mangled_outkeys =
array_new(RedisModuleString *, array_len(currentOp->outkeys));
for (long long j = 0; j < array_len(currentOp->outkeys); j++) {
RedisModuleString *key = currentOp->outkeys[j];
AI_dictEntry *entry = AI_dictFind(mangled_tensors, key);
int *instance = NULL;
if (entry) {
instance = AI_dictGetVal(entry);
*instance += 1;
} else {
instance = RedisModule_Alloc(sizeof(int));
*instance = 1;
AI_dictAdd(mangled_tensors, (void *)key, (void *)instance);
}
char buf[16];
sprintf(buf, "%04d", *instance);
RedisModuleString *mangled_key = RedisModule_CreateStringFromString(NULL, key);
RedisModule_StringAppendBuffer(NULL, mangled_key, buf, strlen(buf));
mangled_outkeys = array_append(mangled_outkeys, mangled_key);
}

if (currentOp->inkeys) {
for (size_t j = 0; j < array_len(currentOp->inkeys); j++) {
RedisModule_FreeString(NULL, currentOp->inkeys[j]);
}
array_free(currentOp->inkeys);
}

if (currentOp->outkeys) {
for (size_t j = 0; j < array_len(currentOp->outkeys); j++) {
RedisModule_FreeString(NULL, currentOp->outkeys[j]);
}
array_free(currentOp->outkeys);
}

currentOp->inkeys = mangled_inkeys;
currentOp->outkeys = mangled_outkeys;
}
size_t ind = array_len(rinfo->dagSharedTensors);

AI_dict *mangled_persisted = AI_dictCreate(&AI_dictTypeHeapRStrings, NULL);
{
AI_dictIterator *iter = AI_dictGetSafeIterator(rinfo->dagTensorsPersistedContext);
AI_dictEntry *entry = AI_dictNext(iter);
while (entry) {
RedisModuleString *key = (RedisModuleString *)AI_dictGetKey(entry);
AI_dictEntry *mangled_entry = AI_dictFind(mangled_tensors, key);
if (!mangled_entry) {
AI_dictRelease(mangled_persisted);
AI_dictReleaseIterator(iter);
RAI_SetError(rinfo->err, RAI_EDAGRUN, "ERR PERSIST key cannot be found in DAG");
goto cleanup;
}
if (AI_dictFind(mangled_persisted, key) != NULL) {
AI_dictRelease(mangled_persisted);
AI_dictReleaseIterator(iter);
RAI_SetError(rinfo->err, RAI_EDAGRUN, "ERR PERSIST keys must be unique");
goto cleanup;
// Add a new empty place holder in the array for an output tensor.
// If this is a TENSORSET op, the tensor is already realized.
if (currentOp->commandType == REDISAI_DAG_CMD_TENSORSET) {
RAI_Tensor *t = RAI_TensorGetShallowCopy(currentOp->outTensor);
rinfo->dagSharedTensors = array_append(rinfo->dagSharedTensors, t);
} else {
rinfo->dagSharedTensors = array_append(rinfo->dagSharedTensors, NULL);
}
int *instance = AI_dictGetVal(mangled_entry);
char buf[16];
sprintf(buf, "%04d", *instance);
RedisModuleString *mangled_key = RedisModule_CreateStringFromString(NULL, key);
RedisModule_StringAppendBuffer(NULL, mangled_key, buf, strlen(buf));
AI_dictAdd(mangled_persisted, (void *)mangled_key, (void *)1);
RedisModule_FreeString(NULL, mangled_key);
entry = AI_dictNext(iter);
currentOp->outkeys_indices = array_append(currentOp->outkeys_indices, ind);
AI_dictReplace(tensorsNamesToInd, (void *)key, (void *)ind);
}
AI_dictReleaseIterator(iter);
}

AI_dictRelease(rinfo->dagTensorsPersistedContext);
rinfo->dagTensorsPersistedContext = mangled_persisted;

for (long long i = 0; i < array_len(rinfo->dagOps); i++) {
if (rinfo->dagOps[i]->devicestr == NULL) {
rinfo->dagOps[i]->devicestr = "CPU";
}
}
// Tensors from TENSORSET ops are ready to be put in DAG local context under their mangled
// names.
_DAG_SetTensorsInLocalContext(rinfo);
res = REDISMODULE_OK;

cleanup : {
AI_dictIterator *iter = AI_dictGetSafeIterator(mangled_tensors);
AI_dictEntry *entry = AI_dictNext(iter);
while (entry) {
int *val = (int *)AI_dictGetVal(entry);
RedisModule_Free(val);
entry = AI_dictNext(iter);
}
AI_dictReleaseIterator(iter);
}
AI_dictRelease(mangled_tensors);
return res;
return REDISMODULE_OK;
}

// Add Shallow copies of the DAG run info to the devices' queues.
Expand Down Expand Up @@ -242,7 +148,7 @@ int RAI_DAGRun(RAI_DAGRunCtx *run_info, RAI_OnFinishCB DAGAsyncFinish, void *pri
}
// Make the inkeys and outkeys of the DAG ops unique, to ensure that the operations
// will be execute in the right order.
if (MangleTensorsNames(rinfo) != REDISMODULE_OK) {
if (MapTensorsKeysToIndices(rinfo, rinfo->tensorsNamesToIndices) != REDISMODULE_OK) {
RAI_SetError(err, rinfo->err->code, rinfo->err->detail);
return REDISMODULE_ERR;
}
Expand All @@ -269,16 +175,13 @@ size_t RAI_DAGNumOutputs(RAI_OnFinishCtx *finish_ctx) {
const RAI_Tensor *RAI_DAGOutputTensor(RAI_OnFinishCtx *finish_ctx, size_t index) {
size_t tensor_get_op_ind = -1;
RedisAI_RunInfo *rinfo = (RedisAI_RunInfo *)finish_ctx;

for (size_t i = 0; i < rinfo->dagOpCount; i++) {
RAI_DagOp *op = rinfo->dagOps[i];
if (op->commandType == REDISAI_DAG_CMD_TENSORGET) {
tensor_get_op_ind++;
if (tensor_get_op_ind == index) {
RAI_Tensor *t;
int res = RAI_getTensorFromLocalContext(rinfo->dagTensorsContext, op->inkeys[0], &t,
op->err);
RedisModule_Assert(res == REDISMODULE_OK);
return t;
return Dag_GetTensorFromGlobalCtx(rinfo, op->inkeys_indices[0]);
}
}
}
Expand Down
32 changes: 27 additions & 5 deletions src/DAG/dag_execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,39 @@
#include "run_info.h"

/**
* @brief We are given a DAG runInfo of a sequence of operations, each with its own
@brief We are given a DAG runInfo of a sequence of operations, each with its own
input and output keys. The names of the keys will be used to look whether the
inputs to a DAG operation have all been realized by previous operations (or if
they are available as part of LOADed keys from keyspace).
This strategy is fine if keys are not aliased, that is, if a command's output
overwrites the key of a previous command. This would trick DAG operations into
thinking that their input is ready when it's not.
To overcome this, we make key names unique, so that names are not aliased. We
mangle the names by appending a numerical suffix ":0001". After computing, we
demangle the keys in order to persist them.*/
int MangleTensorsNames(RedisAI_RunInfo *rinfo);
To overcome this, we map the input and output tensors for every operation to indices,
in the following way. For every input of an operation having the key "x", we map the index
for which "x" was last mapped to when, it was an output of a previous operation.
For every output of an operation "y", we map the next available index in the array.
Every entry in the DAG array contains NULL (except for tensors that where loaded
before the DAG run starts).
@param rinfo The DAG runInfo.
@param tensorsNamesToInd A dict mapping every key name of a tensor that appeared
in DAG operation, to the maximal index of the DAG shared array for which they were mapped to.
@returns REDISMODULE_ERR if there exists an operation for which one of the input
tensors didn't appear as an output of a previous operation, REDISMODULE_OK otherwise
*/
int MapTensorsKeysToIndices(RedisAI_RunInfo *rinfo, AI_dict *tensorsNamesToInd);

/**
* @brief Validates that tensors key names to persist appeared in the DAG operations.
* @param rinfo The DAG runInfo.
* @param tensorsNamesToInd A dict mapping every key name of a tensor that appeared
* in DAG operation, to the maximal index of the DAG shared array for which they were mapped to.
* @param persistTensorsNames A hash table the contains the names of the tensors
* to persist when the DAG run is finished.
* @return REDISMODULE_ERR if there exists a tensor key to persist that didn't
* appear in DAG operation, REDISMODULE_OK otherwise
*/
int ValidatePersistKeys(RedisAI_RunInfo *rinfo, AI_dict *tensorsNamesToInd,
AI_dict *persistTensorsNames);

/**
* @brief Run asynchronously a DAG. This will validate that the sequence of DAG ops
Expand Down
Loading