Skip to content

Commit 52cb321

Browse files
committed
Add the ability to partition TSV by app id
1 parent 6eb910d commit 52cb321

File tree

5 files changed

+57
-24
lines changed

5 files changed

+57
-24
lines changed

modules/main/src/main/scala/com/snowplowanalytics/s3/loader/connector/KinesisS3Emitter.scala

+9-4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class KinesisS3Emitter(client: AmazonS3,
5252
serializer: ISerializer)
5353
extends IEmitter[Result] {
5454

55+
private val partitionTsvByApp = output.s3.partitionForPurpose(purpose).exists(_.contains("{app}"))
56+
5557
/**
5658
* Reads items from a buffer and saves them to s3.
5759
*
@@ -67,9 +69,9 @@ class KinesisS3Emitter(client: AmazonS3,
6769

6870
val records = buffer.getRecords.asScala.toList
6971
val partitionedBatch =
70-
Common.partition(purpose, monitoring.isStatsDEnabled, records)
72+
Common.partition(purpose, partitionTsvByApp, monitoring.isStatsDEnabled, records)
7173

72-
val getBase: Option[RowType.SelfDescribing] => String =
74+
val getBase: Option[RowType] => String =
7375
getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now)
7476
val afterEmit: () => Unit =
7577
() => monitoring.report(partitionedBatch.meta)
@@ -78,7 +80,7 @@ class KinesisS3Emitter(client: AmazonS3,
7880
case (RowType.Unpartitioned, partitionRecords) if partitionRecords.nonEmpty =>
7981
emitRecords(partitionRecords, afterEmit, getBase(None))
8082
.map(_.asLeft)
81-
case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty =>
83+
case (data @ (_: RowType.SelfDescribing | _: RowType.TsvPerApp), partitionRecords) if partitionRecords.nonEmpty =>
8284
emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft)
8385
case _ =>
8486
records // ReadingError or empty partition - should be handled later by serializer
@@ -238,14 +240,17 @@ object KinesisS3Emitter {
238240
lastSeq: String,
239241
now: LocalDateTime
240242
)(
241-
sdj: Option[RowType.SelfDescribing]
243+
row: Option[RowType]
242244
): String = {
245+
val sdj = row.collect { case s: RowType.SelfDescribing => s }
246+
val app = row.collect { case a: RowType.TsvPerApp => a }
243247
val partitionPath = s3Config.partitionForPurpose(purpose).map {
244248
_.template("vendor", sdj.fold("unknown")(_.vendor))
245249
.template("name", sdj.fold("unknown")(_.name))
246250
.template("schema", sdj.fold("unknown")(_.name)) // allowed synonym
247251
.template("format", sdj.fold("unknown")(_.format))
248252
.template("model", sdj.fold(-1)(_.model).toString)
253+
.template("app", app.fold("unknown")(_.appId))
249254
.template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy")))
250255
.template("mm", now.format(DateTimeFormatter.ofPattern("MM")))
251256
.template("dd", now.format(DateTimeFormatter.ofPattern("dd")))

modules/main/src/main/scala/com/snowplowanalytics/s3/loader/package.scala

+5
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,9 @@ package object loader {
3131
* Final result of S3 Loader processing
3232
*/
3333
type Result = Either[GenericError, RawRecord]
34+
35+
/**
36+
* The result of S3 Loader processing with a potentially parsed record
37+
*/
38+
type ParsedResult = Either[GenericError, (RawRecord, Option[Array[String]])]
3439
}

modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Batch.scala

+5-8
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
package com.snowplowanalytics.s3.loader.processing
1414

1515
import java.time.Instant
16-
import java.nio.charset.StandardCharsets.UTF_8
17-
18-
import com.snowplowanalytics.s3.loader.Result
16+
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
1917
import com.snowplowanalytics.s3.loader.processing.Batch.Meta
2018

2119
/** Content of a KCL buffer with metadata attached */
@@ -34,20 +32,19 @@ object Batch {
3432

3533
val EmptyMeta: Meta = Meta(None, 0)
3634

37-
def fromEnriched(inputs: List[Result]): Batch[List[Result]] = {
35+
def fromEnriched(inputs: List[ParsedResult]): Batch[List[ParsedResult]] = {
3836
val meta = inputs.foldLeft(EmptyMeta) {
3937
case (Meta(tstamp, count), Left(_)) =>
4038
Meta(tstamp, count + 1)
41-
case (Meta(tstamp, count), Right(raw)) =>
42-
val strRecord = new String(raw, UTF_8)
43-
val extracted = Common.getTstamp(strRecord).toOption
39+
case (Meta(tstamp, count), Right((_, array))) =>
40+
val extracted = array.flatMap(Common.getTstamp(_).toOption)
4441
val min = Common.compareTstamps(tstamp, extracted)
4542
Meta(min, count + 1)
4643
}
4744

4845
Batch(meta, inputs)
4946
}
5047

51-
def from(inputs: List[Result]): Batch[List[Result]] =
48+
def from[R](inputs: List[R]): Batch[List[R]] =
5249
Batch(EmptyMeta, inputs)
5350
}

modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/Common.scala

+35-12
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@ package com.snowplowanalytics.s3.loader.processing
1414

1515
import java.time.Instant
1616
import java.nio.charset.StandardCharsets.UTF_8
17-
1817
import cats.syntax.either._
19-
2018
import io.circe.parser.parse
21-
2219
import com.snowplowanalytics.iglu.core.SchemaKey
2320
import com.snowplowanalytics.iglu.core.circe.implicits._
24-
25-
import com.snowplowanalytics.s3.loader.Result
21+
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
2622
import com.snowplowanalytics.s3.loader.Config.Purpose
2723
import com.snowplowanalytics.s3.loader.monitoring.StatsD.CollectorTstampIdx
2824

@@ -39,15 +35,26 @@ object Common {
3935
* @param records raw records themselves
4036
*/
4137
def partition(
42-
purpose: Purpose,
43-
statsDEnabled: Boolean,
44-
records: List[Result]
38+
purpose: Purpose,
39+
partitionTsvByApp: Boolean,
40+
statsDEnabled: Boolean,
41+
records: List[Result]
4542
): Batch.Partitioned =
4643
purpose match {
4744
case Purpose.SelfDescribingJson =>
4845
Batch.from(records).map(rs => partitionByType(rs).toList)
49-
case Purpose.Enriched if statsDEnabled =>
50-
Batch.fromEnriched(records).map(rs => List((RowType.Unpartitioned, rs)))
46+
case Purpose.Enriched =>
47+
val parsed = records.map(toParsedRecord(_, actuallyParse = statsDEnabled || partitionTsvByApp))
48+
val batch = if (statsDEnabled)
49+
Batch.fromEnriched(parsed)
50+
else
51+
Batch.from(parsed)
52+
if (partitionTsvByApp)
53+
batch.map(rs => partitionByApp(rs).toList.map {
54+
case (row, records) => (row, records.map(fromParsedRecord))
55+
})
56+
else
57+
batch.map(rs => List((RowType.Unpartitioned, rs.map(fromParsedRecord))))
5158
case _ =>
5259
Batch.from(records).map(rs => List((RowType.Unpartitioned, rs)))
5360
}
@@ -70,9 +77,25 @@ object Common {
7077
case Left(_) => RowType.ReadingError
7178
}
7279

80+
def toParsedRecord(record: Result, actuallyParse: Boolean): ParsedResult =
81+
record.map { byteArray =>
82+
val parsed = if (actuallyParse) Some(new String(byteArray, UTF_8).split("\t", -1)) else None
83+
(byteArray, parsed)
84+
}
85+
86+
def fromParsedRecord(record: ParsedResult): Result = record.map(_._1)
87+
88+
def partitionByApp(records: List[ParsedResult]): Map[RowType, List[ParsedResult]] =
89+
records.groupBy {
90+
case Right((_, array)) =>
91+
// if there are no tabs, avoid returning the whole string
92+
val appId = array.flatMap(_.headOption.filter(_ => array.size > 1))
93+
appId.fold[RowType](RowType.Unpartitioned)(RowType.TsvPerApp)
94+
case Left(_) => RowType.ReadingError
95+
}
96+
7397
/** Extract a timestamp from enriched TSV line */
74-
def getTstamp(row: String): Either[RuntimeException, Instant] = {
75-
val array = row.split("\t", -1)
98+
def getTstamp(array: Array[String]): Either[RuntimeException, Instant] = {
7699
for {
77100
string <- Either
78101
.catchOnly[IndexOutOfBoundsException](array(CollectorTstampIdx))

modules/main/src/main/scala/com/snowplowanalytics/s3/loader/processing/RowType.scala

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ object RowType {
2323
*/
2424
case object Unpartitioned extends RowType
2525

26+
/** TSV line partitioned by app id */
27+
final case class TsvPerApp(appId: String) extends RowType
28+
2629
/** JSON line with self-describing payload that can be partitioned */
2730
final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType
2831

0 commit comments

Comments
 (0)