Skip to content

Commit

Permalink
[fix][dingo-exec] Resolve the issue of null values in join connection…
Browse files Browse the repository at this point in the history
… columns
  • Loading branch information
guojn1 authored and githubgxll committed Feb 26, 2025
1 parent 2a1343a commit b75c004
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static List<Vertex> visit(
rel.getJoinType() == JoinRelType.LEFT || rel.getJoinType() == JoinRelType.FULL,
rel.getJoinType() == JoinRelType.RIGHT || rel.getJoinType() == JoinRelType.FULL
);
param.setJoinType(rel.getJoinType().lowerName);
param.setOtherExpr(otherCondition);
Vertex vertex = new Vertex(HASH_JOIN, param);
vertex.setId(idGenerator.getOperatorId(taskId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
if (pin == 0) { // left
waitRightFinFlag(param);
TupleKey leftKey = new TupleKey(leftMapping.revMap(tuple));
boolean isEmpty = isEmpty(leftKey, param);
if (isEmpty && ("inner".equalsIgnoreCase(param.getJoinType()) || "right".equalsIgnoreCase(param.getJoinType()))) {
return true;
}
if (isEmpty && "left".equalsIgnoreCase(param.getJoinType())) {
Object[] newTuple = Arrays.copyOf(tuple, leftLength + rightLength);
Arrays.fill(newTuple, leftLength, leftLength + rightLength, null);
return pushToNext(param, edge, context, newTuple);
}
List<TupleWithJoinFlag> rightList = param.getHashMap().get(leftKey);
if (rightList != null) {
for (TupleWithJoinFlag t : rightList) {
Expand All @@ -77,6 +86,9 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
}
} else if (pin == 1) { //right
TupleKey rightKey = new TupleKey(rightMapping.revMap(tuple));
if (isEmpty(rightKey, param) && "inner".equalsIgnoreCase(param.getJoinType())) {
return true;
}
List<TupleWithJoinFlag> list = param.getHashMap()
.computeIfAbsent(rightKey, k -> Collections.synchronizedList(new LinkedList<>()));
list.add(new TupleWithJoinFlag(tuple));
Expand Down Expand Up @@ -163,4 +175,19 @@ private static boolean pushToNext(HashJoinParam param, Edge edge, Context contex
return edge.transformToNext(context, tuple);
}
}

public static boolean isEmpty(TupleKey tupleKey, HashJoinParam param) {
if (param.rightMappingEmpty && param.leftMappingEmpty) {
return false;
}
Object[] tuple = tupleKey.getTuple();
if (tuple != null) {
for (Object item : tuple) {
if (item != null) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,20 @@ public class HashJoinParam extends AbstractParams {
@Setter
private transient CompletableFuture<Void> future;

@Getter
@Setter
public Profile profileLeft;
@Getter
@Setter
public Profile profileRight;

@Getter
@Setter
public SqlExpr otherExpr;

@Setter
public String joinType;

public boolean leftMappingEmpty;
public boolean rightMappingEmpty;


public HashJoinParam(
TupleMapping leftMapping,
Expand All @@ -83,6 +86,8 @@ public HashJoinParam(
this.rightLength = rightLength;
this.leftRequired = leftRequired;
this.rightRequired = rightRequired;
this.leftMappingEmpty = this.leftMapping.size() == 0;
this.rightMappingEmpty = this.rightMapping.size() == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,19 @@ public static void initTableByTemplate(String schema,
}

public static void synchronizeTenant() {
List<Object> tenantObjList = io.dingodb.meta.InfoSchemaService.root().listTenant();
tenantObjList.forEach(object -> {
Tenant tenant = (Tenant) object;
if (!MetaService.ROOT.existsTenant(tenant.getId())) {
MetaService.ROOT.createTenant(tenant);
LogUtils.info(log, "synchronize tenant id to coordinator:{}", tenant.getId());
}
});
try {
List<Object> tenantObjList = io.dingodb.meta.InfoSchemaService.root().listTenant();
tenantObjList.forEach(object -> {
Tenant tenant = (Tenant) object;
if (!MetaService.ROOT.existsTenant(tenant.getId())) {
MetaService.ROOT.createTenant(tenant);
LogUtils.info(log, "synchronize tenant id to coordinator:{}", tenant.getId());
}
});
LogUtils.info(log, "synchronizeTenant done");
} catch (Exception e) {
LogUtils.error(log, e.getMessage(), e);
}
}

private static boolean continueRetry() {
Expand Down

0 comments on commit b75c004

Please # to comment.