Skip to content

Commit

Permalink
SNOW-1661955 extend logs for QueryArrowStream (#1222)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-pfus authored Oct 28, 2024
1 parent 1a90e21 commit f7fee89
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
1 change: 1 addition & 0 deletions arrow_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (arc *arrowResultChunk) decodeArrowChunk(ctx context.Context, rowType []exe

start := len(chunkRows)
numRows := int(record.NumRows())
logger.Debugf("rows in current record: %v", numRows)
columns := record.Columns()
chunkRows = append(chunkRows, make([]chunkRowType, numRows)...)
for i := start; i < start+numRows; i++ {
Expand Down
21 changes: 14 additions & 7 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (sc *snowflakeConn) exec(
}
}

logger.WithContext(ctx).Info("Exec/Query SUCCESS")
logger.WithContext(ctx).Infof("Exec/Query SUCCESS with total=%v, returned=%v", data.Data.Total, data.Data.Returned)
if data.Data.FinalDatabaseName != "" {
sc.cfg.Database = data.Data.FinalDatabaseName
}
Expand Down Expand Up @@ -698,27 +698,29 @@ func (scd *snowflakeArrowStreamChunkDownloader) JSONData() [][]*string {

// the server might have had an empty first batch, check if we can decode
// that first batch, if not we skip it.
func (scd *snowflakeArrowStreamChunkDownloader) maybeFirstBatch() []byte {
func (scd *snowflakeArrowStreamChunkDownloader) maybeFirstBatch() ([]byte, error) {
if scd.RowSet.RowSetBase64 == "" {
return nil
return nil, nil
}

// first batch
rowSetBytes, err := base64.StdEncoding.DecodeString(scd.RowSet.RowSetBase64)
if err != nil {
// match logic in buildFirstArrowChunk
// assume there's no first chunk if we can't decode the base64 string
return nil
logger.Warnf("skipping first batch as it is not a valid base64 response. %v", err)
return nil, err
}

// verify it's a valid ipc stream, otherwise skip it
rr, err := ipc.NewReader(bytes.NewReader(rowSetBytes))
if err != nil {
return nil
logger.Warnf("skipping first batch as it is not a valid IPC stream. %v", err)
return nil, err
}
rr.Release()

return rowSetBytes
return rowSetBytes, nil
}

func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamBatch, err error) {
Expand All @@ -727,7 +729,10 @@ func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamB

out = make([]ArrowStreamBatch, chunkMetaLen, chunkMetaLen+1)
toFill := out
rowSetBytes := scd.maybeFirstBatch()
rowSetBytes, err := scd.maybeFirstBatch()
if err != nil {
return nil, err
}
// if there was no first batch in the response from the server,
// skip it and move on. toFill == out
// otherwise expand out by one to account for the first batch
Expand All @@ -751,12 +756,14 @@ func (scd *snowflakeArrowStreamChunkDownloader) GetBatches() (out []ArrowStreamB
Loc: loc,
scd: scd,
}
logger.Debugf("batch %v, numrows: %v", i, toFill[i].numrows)
totalCounted += int64(scd.ChunkMetas[i].RowCount)
}

if len(rowSetBytes) > 0 {
// if we had a first batch, fill in the numrows
out[0].numrows = scd.Total - totalCounted
logger.Debugf("first batch, numrows: %v", out[0].numrows)
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion connection_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func writeFileStream(ctx context.Context, streamBuf *bytes.Buffer) error {

func (sc *snowflakeConn) populateSessionParameters(parameters []nameValueParameter) {
// other session parameters (not all)
logger.WithContext(sc.ctx).Infof("params: %#v", parameters)
logger.WithContext(sc.ctx).Tracef("params: %#v", parameters)
for _, param := range parameters {
v := ""
switch param.Value.(type) {
Expand Down
4 changes: 2 additions & 2 deletions ocsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func fullOCSPURL(url *url.URL) string {

// getRevocationStatus checks the certificate revocation status for subject using issuer certificate.
func getRevocationStatus(ctx context.Context, subject, issuer *x509.Certificate) *ocspStatus {
logger.WithContext(ctx).Infof("Subject: %v, Issuer: %v", subject.Subject, issuer.Subject)
logger.WithContext(ctx).Tracef("Subject: %v, Issuer: %v", subject.Subject, issuer.Subject)

status, ocspReq, encodedCertID := validateWithCache(subject, issuer)
if isValidOCSPStatus(status.code) {
Expand Down Expand Up @@ -955,7 +955,7 @@ func extractOCSPCacheResponseValue(certIDKey *certIDKey, certCacheValue *certCac
status = validateOCSP(ocspResponse)
ocspParsedRespCache[cacheKey] = status
}
logger.Debugf("OCSP status found in cache: %v; certIdKey: %v", status, certIDKey)
logger.Tracef("OCSP status found in cache: %v; certIdKey: %v", status, certIDKey)
return status
}

Expand Down

0 comments on commit f7fee89

Please # to comment.