You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The purpose of this discussion is to raise ideas and stimulate brain storm regarding the implementation of a join between 2 indices as part of a query execution.
General
In the literature a join operation can be implemented in the following ways:
Nested loop join
The nested loop join is the simplest join algorithm.
One of the relations is nominated as the inner relation and the other as the outer relation, each tuple of the outer relation is compared with each tuple of the inner relation and if the join condition is satisfied, the tuples of the relation L and R are concatenated and added into the returned virtual relation
Primitive nested loop
During joins on some relations, the nested loop operation can be executed directly on the worker node.
When queries involving a CROSS JOIN where each shard sends the data to the worker node.
Afterwards, this node runs the nested loop, applies limits, etc. and ultimately returns the results, joins can be nested, so instead of collecting data from shards the rows can be the result of a previous join or table function.
Join Internals
Shards locality
In E/S an index is combined of shard in which each shard is an in-depended lucene index.
In addition to these (primary) shards that are distributed across the cluster, there may be replica shards which are mainly used for high availability.
The replica shards are also distributed across the cluster with different locations from the primary shards - this fact also may be helpful when doing joins between indexes - we always prefer doing node local joins rather than transferring data on wire.
Size Consideration
In many occasions we will need to join two datasets which are bigger than a single machine memory can handle.
In such scenarios, we will employ a split technique that would allow breaking up the index into small enough digestible element (blocks) which can be joined together.
Execution Order
When joining two indices , we have a sets of shards that are distributed across the cluster.
For distributed query execution, ideally, each node in the cluster should perform a portion of the work required for the join.
One of the consequences of the distributed hash join algorithm is that the left (outer) table is split into blocks and the right (inner) table must be read once for every block. To minimize the total number of rows read, we swap the tables if necessary to ensure that the right-hand table is the smallest of the two.
Each node must:
Identify the left-hand index shards that are kept locally and then split those shards into blocks.
Broadcast the right-hand index shards that are kept locally to all other nodes.
Use the complete set of rows for the right-hand index, taken from the shards residing locally on the node and the ones received from other nodes.
Produce a partial result set by running the hash join algorithm on the local left-hand blocks using the local copy of the right-hand index.
Partial result sets are then transmitted to the handling node for merging and further processing (e.g., applying operations like ORDER BY, LIMIT, OFFSET, and so on) before being returned to the client.
This is the basic approach and it has a serious drawback of data being transferred across the cluster between the nodes, while this data transfer is mandatory to perform the join, it would be most helpful if we could only transfer the "correct" rows between the nodes .
The term Correct indicated that a join would produce a row rather than not fulfilling the joins condition.
Reducing Data transfer
We will employ a bloom filter (created from the right-hand side index identifiers) to create a fixed size byte array (with a pre-selected error rate ) that we would exchange between the nodes.
This filter would be used as follows:
Each node must:
Identify the left-hand index shards that are kept locally and then split those shards into blocks.
Calculate bloom filter for the right-hand index shards that are kept locally to all other nodes.
Receive from each remote node the (subset) of suspected positive id's (that match the join filter)
Match the suspected id's with the actual ids residing locally and for the positive ones
Broadcast for each node these positive matched rows
Where received from other node a bloom filter
Test the local block rows (left-hand) against the arriving filter
Send only the positive (suspected) rows back to the node filter sender
receive the true-positive rows from the node to use for the join operation
Use the complete set of rows for the right-hand index, taken from the shards residing locally on the node and the ones received from other nodes .
Produce a partial result set by running the hash join algorithm on the local left-hand blocks using the local copy of the right-hand index.
Partial result sets are then transmitted to the handling node for merging and further processing (e.g., applying operations like ORDER BY, LIMIT, OFFSET, and so on) before being returned to the client.
Improvement
This technique takes two rounds of data transfer but to total amount of data being send is much smaller than the original algorithm.
Since we employ a statistical approach we must select the estimated error rate in advance - the error rate directly influence the byte array size of the filter being send over the wire.
Node Memory state
Since the new algorithm has an additional step where it calculates the bloom filter and send it across the wire and expecting the return of positive suspected rows from other nodes - it must preserve its state so that it won't have to fetch the rows again from local store.
This state preservation must be taken into account when calculating the block size of the join to be executed .
Trade-off
Since this approach is suitable for very large datasets - we may consider that if the query cost estimator estimate the overall join size to be under some magic number - we would employ the basic algorithm without the bloom filter.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Distributed Join
The purpose of this discussion is to raise ideas and stimulate brain storm regarding the implementation of a join between 2 indices as part of a query execution.
General
In the literature a join operation can be implemented in the following ways:
Nested loop join
The nested loop join is the simplest join algorithm.
One of the relations is nominated as the inner relation and the other as the outer relation, each tuple of the outer relation is compared with each tuple of the inner relation and if the join condition is satisfied, the tuples of the relation L and R are concatenated and added into the returned virtual relation
Primitive nested loop
During joins on some relations, the nested loop operation can be executed directly on the worker node.
When queries involving a CROSS JOIN where each shard sends the data to the worker node.
Afterwards, this node runs the nested loop, applies limits, etc. and ultimately returns the results, joins can be nested, so instead of collecting data from shards the rows can be the result of a previous join or table function.
Join Internals
Shards locality
In E/S an index is combined of shard in which each shard is an in-depended lucene index.
In addition to these (primary) shards that are distributed across the cluster, there may be replica shards which are mainly used for high availability.
The replica shards are also distributed across the cluster with different locations from the primary shards - this fact also may be helpful when doing joins between indexes - we always prefer doing node local joins rather than transferring data on wire.
Size Consideration
In many occasions we will need to join two datasets which are bigger than a single machine memory can handle.
In such scenarios, we will employ a split technique that would allow breaking up the index into small enough digestible element (blocks) which can be joined together.
Execution Order
When joining two indices , we have a sets of shards that are distributed across the cluster.
For distributed query execution, ideally, each node in the cluster should perform a portion of the work required for the join.
One of the consequences of the distributed hash join algorithm is that the left (outer) table is split into blocks and the right (inner) table must be read once for every block. To minimize the total number of rows read, we swap the tables if necessary to ensure that the right-hand table is the smallest of the two.
Each node must:
This is the basic approach and it has a serious drawback of data being transferred across the cluster between the nodes, while this data transfer is mandatory to perform the join, it would be most helpful if we could only transfer the "correct" rows between the nodes .
The term Correct indicated that a join would produce a row rather than not fulfilling the joins condition.
Reducing Data transfer
We will employ a bloom filter (created from the right-hand side index identifiers) to create a fixed size byte array (with a pre-selected error rate ) that we would exchange between the nodes.
This filter would be used as follows:
Each node must:
Identify the left-hand index shards that are kept locally and then split those shards into blocks.
Calculate bloom filter for the right-hand index shards that are kept locally to all other nodes.
Receive from each remote node the (subset) of suspected positive id's (that match the join filter)
Where received from other node a bloom filter
Use the complete set of rows for the right-hand index, taken from the shards residing locally on the node and the ones received from other nodes .
Produce a partial result set by running the hash join algorithm on the local left-hand blocks using the local copy of the right-hand index.
Partial result sets are then transmitted to the handling node for merging and further processing (e.g., applying operations like ORDER BY, LIMIT, OFFSET, and so on) before being returned to the client.
Improvement
This technique takes two rounds of data transfer but to total amount of data being send is much smaller than the original algorithm.
Since we employ a statistical approach we must select the estimated error rate in advance - the error rate directly influence the byte array size of the filter being send over the wire.
Node Memory state
Since the new algorithm has an additional step where it calculates the bloom filter and send it across the wire and expecting the return of positive suspected rows from other nodes - it must preserve its state so that it won't have to fetch the rows again from local store.
This state preservation must be taken into account when calculating the block size of the join to be executed .
Trade-off
Since this approach is suitable for very large datasets - we may consider that if the query cost estimator estimate the overall join size to be under some magic number - we would employ the basic algorithm without the bloom filter.
Beta Was this translation helpful? Give feedback.
All reactions