Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Snap 3269 #197

Merged
merged 1,830 commits into from
Dec 15, 2019
Merged

Snap 3269 #197

merged 1,830 commits into from
Dec 15, 2019

Conversation

ahshahid
Copy link

What changes were proposed in this pull request?

Fix for Bug SNAP-3269.
The byte array in the BufferHolder used to create an Unsafe JoinedRow is being reused, while the UnsafeRow generated itself will get cached in the SnappyHashAggregate ( If the query plan is such that output of Window Expression is being used by Aggregate Function). This causes the previous UnsafeRow objects cached to point to the current state of bytes in the BufferHolder causing data corruption.
The fix is to copy the row generated from the WindowExpression.

How was this patch tested?

bug test & precheckin.
The bug test is added in the SnappyData repo.
Please review http://spark.apache.org/contributing.html before opening a pull request.

hbhanawat and others added 30 commits July 8, 2017 21:44
…the Snappy side and do not cause a system.exit (#2)

Instead of using SparkUncaughtExceptionHandler, executor now gets the uncaught exception handler and uses it to handle the exception. But if it is a local mode, it still uses the SparkUncaughtExceptionHandler

A test has been added in the Snappy side PR for the same.
Used by the new benchmark from the PR adapted for SnappyData for its vectorized implementation.

Build updated to set testOutput and other variables instead of appending to existing values
(causes double append with both snappydata build adding and this adding for its tests)
- don't apply numBuckets in Shuffle partitioning since Shuffle cannot create
  a compatible partitioning with matching numBuckets (only numPartitions)
- check numBuckets too in HashPartitioning compatibility
Change involves:
1) Reducing the generated code size when writing struct having all fields of same data type.
2) Fixing an issue in WholeStageCodeGenExec, where a plan supporting CodeGen was not being prefixed by InputAdapter in case, the node did not participate in whole stage code gen.
… partitioned sample table. (#22)

These changes are related to AQP-79.
Provide preferred location for each bucket-id in case of partitioned sample table.
instead of using system javac. This eliminates problem when system jdk
is set differently from JAVA_HOME
This is to provide support for DataSerializable implementation in AQP
corrected offsetInBytes in UnsafeRow.writeToStream
….referenceBuffer (#32)

Use a map instead of queue for ContextCleaner.referenceBuffer. Profiling shows lot of time being spent removing from queue where a hash map will do (referenceQueue is already present for poll).
This avoids runtime erasure for add/value methods that will result in unnecessary boxing/unboxing overheads.

- Adding spark-kafka-sql project
- Update version of deps as per upstream.
- corrected kafka-clients reference
- allow direct UTF8String objects in RDD data conversions to DataFrame;
  new UTF8String.cloneIfRequired to clone only if required used by above
- allow for some precision change in QueryTest result comparison
Missing filter statistics in filter's logical plan is causing incorrect plan selection at times.
Also, join statistics always return sizeInBytes as the product of its child sizeInBytes which
result in a big number. For join, product makes sense only when it is a cartesian product join.
Hence, fixed the spark code to check for the join type. If the join is a equi-join,
  we now sum the sizeInBytes of the child instead of doing a product.

For missing filter statistics, adding a heuristics based sizeInBytes calculation mentioned below.
If the filtering condition is:
- equal to: sizeInBytes is 5% of the child sizeInBytes
- greater than less than: sizeInBytes is 50% of the child sizeInBytes
- isNull: sizeInBytes is 50% of the child sizeInBytes
- starts with: sizeInBytes is 10% of the child sizeInBytes
Addition of numBuckets as default parameter made HashPartitioning incompatible with upstream apache spark.
Now adding it separately so restore compatibility.
This is done so that any spill call (due to no EVICTION_DOWN) from within the spill
call will return without doing anything, else it results in NPE trying to read
page tables which have already been cleared.
- IN is 50% of original
- StartsWith, EndsWith 10%
- Contains and LIKE at 20%
- AND is multiplication of sizing of left and right (with max filtering of 5%)
- OR is 1/x+1/y sizing of the left and right (with min filtering of 50%)
- NOT three times of that without NOT
* UI HTML, CSS and resources changes

* Adding new health status images

* Adding SnappyData Logo.

* Code changes for stting/updating Spark UI tabs list.

* Adding icon images for Running, Stopped and Warning statuses.

* 1. Adding New method for generating Spark UI page without page header text.
2. Updating CSS: Cluster Normal status text color is changed to match color of Normal health logo.

* Suggestion: Rename Storage Tab to Spark Cache.

*  Resolving Precheckin failure due to scala style comments
:snappy-spark:snappy-spark-core_2.11:scalaStyle
SparkUI.scala message=Insert a space after the start of the comment line=75 column=4
UIUtils.scala message=Insert a space after the start of the comment line=267 column=4
…partitions (#37)

- reason is that shuffle is added first with default shuffle partitions,
  then the child with maximum partitions is selected; now marking children where
  implicit shuffle was introduced then taking max of rest (except if there are no others
      in which case the negative value gets chosen and its abs returns default shuffle partitions)
- second change is to add a optional set of alias columns in OrderlessHashPartitioning
  for expression matching to satisfy partitioning in case it is on an alias for partitioning column
  (helps queries like TPCH Q21 where implicit aliases are introduced to resolve clashes in self-joins);
  data sources can use this to pass projection aliases, if any (only snappydata ones in embedded mode)
…ions

use child.outputPartitioning.numPartitions for shuffle partition case instead of depending
on it being defaultNumPreShufflePartitions
snappy-sachin and others added 16 commits November 27, 2019 18:16
* SNAP-2919 : Implementation of Structured Streaming UI Tab

Implementation of the Structured Streaming UI Tab which lets users monitor the structured streaming queries/applications statistics and progress .
Structured Streaming Tab is available both in TIBCO ComputeDB/SnappyData embedded cluster as well as in smart connector application (using Snappy Spark distribution)

Structured Streaming Tab has below capabilities:

- Listing all Structured Streaming Queries/Applications submitted to SnappyData cluster using submit-job command. Similarly in smart connector this tab will list streaming queries executed in cluster.
- Allows user selecting queries from left hand side navigation panel, to view details view on right side main query details panel.
- Query details panel displays selected queries details and statistics, as listed below;
  -- Query Name if provided, Query Id otherwise
  -- Start Date & Time
  -- Up time
  -- Trigger Interval
  -- Batches Processed
  -- Status
  -- Total Input Records
  -- Current Input Rate
  -- Current Processing Rate
  -- Total Batch Processing Time
  -- Avg. Batch Processing Time
- Query details panel also lists sources of streaming query along with each source details like type, description, input records, input and processing rate
- Query details panel also displays sink details of streaming query.
- Query details panel depicts structured streaming queries behavioural trends using following
  -- Input Records on every batch
  -- Input Rate vs Processing Rate
  -- Processing Time
  -- Aggregation State, if available
- All statistics displayed on UI are auto updated periodically 

- Adding two configurable parameters in sparks SQLConf.scala
      1) spark.sql.streaming.uiRunningQueriesDisplayLimit :
           To configure how many queries be displayed on structure streaming UI.
      2) spark.sql.streaming.uiTrendsMaxSampleSize :
           To configure how many historic data points be plotted on trends charts on structure streaming UI.
* Fixes for SNAP-3253:

  - Call to add structured streaming UI tab is shifted to SnappySession class.
  - Renamed method updateUI to updateUIWithStructuredStreamingTab and also changed access modifier from private to protected.
  - Structured Streaming tab prefix-uri changed from "structurestreaming" to "structuredstreaming".
- Removed start date-time value created and stored in HTML Page.
- Code refactoring
 - Fixes for UI component misalignment
 - Sink table border is removed
 - Code cleanup and reformatting
 - Adding Unit in Processing Time chart header
- Adding fix for broken left side query navigation panel sorting.
* merging cherry picked commit 21fde57 from apache/spark master for supporting multi line json parsing

* provided a single argument constructor so that existing code of snappydata written for spark 2.1 works correctly

* fixed scala style failure
* Change the maxTaskFailures depending on property

The local propery is set based on plan from snappy side
If the propery is set, then maxTaskFailures is set to the set value
…ion (#191)

is in-memory which running pyspark shell.

## What changes were proposed in this pull request?

We are initializing `SparkSession` as well as `SnappySession` while starting pyspark shell.
`SparkSession` and `SparkContext`were always initialized with hive support enable
 irrespective of value of `spark.sql.catalogImplementation` config. 

With these changes, we are checking the value of `spark.sql.catalogImplementation` and
hive support is not enabled when the value of above-mentioned property is set to
 `in-memory` explicitly. 

SnappySession will be only initialized when catalog implementation is set to `in-memory`
to avoid failure reported in SNAP-3165. 

Later we can provide support for hive catalog implementation for python with SnappySession.
- Adding code changes to fix the issue SNAP-3266
#193)

* Added change for initial metric name from TIBCO ComputeDB to TIBCO_ComputeDB
…Trigger object (#194)

As part of recent structured streaming UI, we introduced a new field for
`Trigger` in `QueryStartedEvent`. This field was added to retrieve the configured
trigger interval which can be displayed on UI on one of the charts.

Since `org.apache.spark.sql.streaming.Trigger` is a trait, JSON deserialization
of the same will require custom deserialization logic. Writing the same will be
overkill as ultimately the `Trigger` object has only one implementation as
of now which `ProcessingTime` and all `ProcessingTime` contains is trigger
interval which of `Long` type. Hence instead of passing the whole object as
part of the event, we are now passing only the trigger interval.
## What changes were proposed in this pull request?

For streaming UI, `SnappyStreamingQueryListener` is registered on listener bus
at the time of creating `SnappySession`. However, this listener is never removed
from the listener bus. Hence even if the `SnappySession` is collected by GC,
`SnappyStreamingQueryListener` is left orphan on the listener bus which is never
eligible for GC collection.

To fix this we are removing the listener from the listener bus when the session
the instance is getting collected by GC (i.e. in finalize method) which will make
the listener instance eligible for GC during the next GC cycle.

It should be OK if the listener instance gets collected in the next GC cycle as the
the memory footprint of the listener object is not big.

Another possible place to remove the listener in `close` method of the session,
however close method of session is not required to be closed explicitly.

## How was this patch tested?

Reproduced the issue by running the following code as part of a snappy job: 

```
while(true){
      session.newSession()
}
```

Collected histogram of leader process using `jmap` and observed that instances of
`SnappyStreamingQueryListener` is increasing indefinitely and never garbage
collected whereas `SnappySession` instances are garbage collected: 

`jmap -histo:live <leader pid>|grep "SnappySession\|SnappyStreamingQueryListener"`

Followed the same steps after applying the changes and noticed that
 `SnappyStreamingQueryListener` instances are garbage collected.

--- 

- `SnappyStreamingQueryListenerSuite` is passing now. Added a call to `finalize`
method in `StreamingQueryListenerSuite` to get this change tested.
#196)

 - Data Table config parameter name is changed from "iDisplayLength" to "pageLength".
 - Streaming Queries navigation list, sources and sink tables config parameter "pageLength" is
   set to display all entries in it.  
 - Chart title changed from "Aggregation States" to "Aggregation State"
@ahshahid ahshahid merged commit 5a4fa89 into master Dec 15, 2019
@ahshahid ahshahid deleted the SNAP-3269 branch December 15, 2019 17:52
ahshahid added a commit that referenced this pull request Dec 15, 2019
This reverts commit 5a4fa89.
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.