Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Expires Redis Keys based on Feature Table Max Age #1161

Merged
merged 4 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@
<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-scalatest_${scala.version}</artifactId>
<version>0.38.3</version>
<version>0.38.6</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.dimafeng</groupId>
<artifactId>testcontainers-scala-kafka_${scala.version}</artifactId>
<version>0.38.3</version>
<version>0.38.6</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object BatchPipeline extends BasePipeline {
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
.option("timestamp_column", config.source.eventTimestampColumn)
.option("max_age", config.featureTable.maxAge.getOrElse(0))
.save()

config.deadLetterPath match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package feast.ingestion

import org.joda.time.DateTime

import org.json4s._
import org.json4s.jackson.JsonMethods.{parse => parseJSON}
import org.json4s.ext.JavaEnumNameSerializer
Expand All @@ -29,7 +28,7 @@ object IngestionJob {
new JavaEnumNameSerializer[feast.proto.types.ValueProto.ValueType.Enum]() +
ShortTypeHints(List(classOf[ProtoFormat], classOf[AvroFormat]))

val parser = new scopt.OptionParser[IngestionJobConfig]("IngestionJon") {
val parser = new scopt.OptionParser[IngestionJobConfig]("IngestionJob") {
// ToDo: read version from Manifest
head("feast.ingestion.IngestionJob", "0.8.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ case class FeatureTable(
name: String,
project: String,
entities: Seq[Field],
features: Seq[Field]
features: Seq[Field],
maxAge: Option[Int] = None
)

case class IngestionJobConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object StreamingPipeline extends BasePipeline with Serializable {
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
.option("timestamp_column", config.source.eventTimestampColumn)
.option("max_age", config.featureTable.maxAge.getOrElse(0))
.save()

config.deadLetterPath match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
*/
package feast.ingestion.stores.redis

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import redis.clients.jedis.{Pipeline, Response}
import java.nio.charset.StandardCharsets
import java.util

import com.google.common.hash.Hashing

import scala.jdk.CollectionConverters._
import com.google.protobuf.Timestamp
import feast.ingestion.utils.TypeConversion
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import redis.clients.jedis.{Pipeline, Response}

import scala.jdk.CollectionConverters._

/**
* Use Redis hash type as storage layout. Every feature is stored as separate entry in Hash.
Expand All @@ -35,10 +36,10 @@ import feast.ingestion.utils.TypeConversion
* Values are serialized with protobuf (`ValueProto`).
*/
class HashTypePersistence(config: SparkRedisConfig) extends Persistence with Serializable {
def encodeRow(
keyColumns: Array[String],
timestampField: String,
value: Row

private def encodeRow(
value: Row,
maxExpiryTimestamp: java.sql.Timestamp
): Map[Array[Byte], Array[Byte]] = {
val fields = value.schema.fields.map(_.name)
val types = value.schema.fields.map(f => (f.name, f.dataType)).toMap
Expand All @@ -51,49 +52,87 @@ class HashTypePersistence(config: SparkRedisConfig) extends Persistence with Ser
}
.filter { case (k, _) =>
// don't store entities & timestamp
!keyColumns.contains(k) && k != config.timestampColumn
!config.entityColumns.contains(k) && k != config.timestampColumn
}
.map { case (k, v) =>
encodeKey(k) -> encodeValue(v, types(k))
}

val timestamp = Seq(
val timestampHash = Seq(
(
timestampField.getBytes,
timestampHashKey(config.namespace).getBytes,
encodeValue(value.getAs[Timestamp](config.timestampColumn), TimestampType)
)
)

values ++ timestamp
val expiryUnixTimestamp = {
if (config.maxAge > 0)
value.getAs[java.sql.Timestamp](config.timestampColumn).getTime + config.maxAge * 1000
else maxExpiryTimestamp.getTime
}
val expiryTimestamp = new java.sql.Timestamp(expiryUnixTimestamp)
val expiryTimestampHash = Seq(
(
expiryTimestampHashKey(config.namespace).getBytes,
encodeValue(expiryTimestamp, TimestampType)
)
)

values ++ timestampHash ++ expiryTimestampHash
}

def encodeValue(value: Any, `type`: DataType): Array[Byte] = {
private def encodeValue(value: Any, `type`: DataType): Array[Byte] = {
TypeConversion.sqlTypeToProtoValue(value, `type`).toByteArray
}

def encodeKey(key: String): Array[Byte] = {
private def encodeKey(key: String): Array[Byte] = {
val fullFeatureReference = s"${config.namespace}:$key"
Hashing.murmur3_32.hashString(fullFeatureReference, StandardCharsets.UTF_8).asBytes()
}

def save(
private def timestampHashKey(namespace: String): String = {
s"${config.timestampPrefix}:${namespace}"
}

private def expiryTimestampHashKey(namespace: String): String = {
s"${config.expiryPrefix}:${namespace}"
}

private def decodeTimestamp(encodedTimestamp: Array[Byte]): java.sql.Timestamp = {
new java.sql.Timestamp(Timestamp.parseFrom(encodedTimestamp).getSeconds * 1000)
}

override def save(
pipeline: Pipeline,
key: Array[Byte],
value: Map[Array[Byte], Array[Byte]],
ttl: Int
row: Row,
expiryTimestamp: java.sql.Timestamp,
maxExpiryTimestamp: java.sql.Timestamp
): Unit = {
pipeline.hset(key, value.asJava)
if (ttl > 0) {
pipeline.expire(key, ttl)
val value = encodeRow(row, maxExpiryTimestamp).asJava
pipeline.hset(key, value)
if (expiryTimestamp.equals(maxExpiryTimestamp)) {
pipeline.persist(key)
} else {
pipeline.expireAt(key, expiryTimestamp.getTime / 1000)
}
}

def getTimestamp(
override def get(
pipeline: Pipeline,
key: Array[Byte],
timestampField: String
): Response[Array[Byte]] = {
pipeline.hget(key, timestampField.getBytes)
key: Array[Byte]
): Response[util.Map[Array[Byte], Array[Byte]]] = {
pipeline.hgetAll(key)
}

override def storedTimestamp(
value: util.Map[Array[Byte], Array[Byte]]
): Option[java.sql.Timestamp] = {
value.asScala.toMap
.map { case (key, value) =>
(key.map(_.toChar).mkString, value)
}
.get(timestampHashKey(config.namespace))
.map(value => decodeTimestamp(value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,58 @@
*/
package feast.ingestion.stores.redis

import java.sql.Timestamp
import java.util

import org.apache.spark.sql.Row
import redis.clients.jedis.{Pipeline, Response}

/**
* Determine how a Spark row should be serialized and stored on Redis.
*/
trait Persistence {
def encodeRow(
keyColumns: Array[String],
timestampField: String,
value: Row
): Map[Array[Byte], Array[Byte]]

/**
* Persist a Spark row to Redis
*
* @param pipeline Redis pipeline
* @param key Redis key in serialized bytes format
* @param row Row representing the value to be persist
* @param expiryTimestamp Expiry timestamp for the row
* @param maxExpiryTimestamp No ttl should be set if the expiry timestamp
* is equal to the maxExpiryTimestamp
*/
def save(
pipeline: Pipeline,
key: Array[Byte],
value: Map[Array[Byte], Array[Byte]],
ttl: Int
row: Row,
expiryTimestamp: Timestamp,
maxExpiryTimestamp: Timestamp
): Unit

def getTimestamp(
/**
* Returns a Redis response, which can be used by `storedTimestamp` and `newExpiryTimestamp` to
* derive the currently stored event timestamp, and the updated expiry timestamp. This method will
* be called prior to persisting the row to Redis, so that `RedisSinkRelation` can decide whether
* the currently stored value should be updated.
*
* @param pipeline Redis pipeline
* @param key Redis key in serialized bytes format
* @return Redis response representing the row value
*/
def get(
pipeline: Pipeline,
key: Array[Byte],
timestampField: String
): Response[Array[Byte]]
key: Array[Byte]
): Response[util.Map[Array[Byte], Array[Byte]]]

/**
* Returns the currently stored event timestamp for the key and the feature table associated with the ingestion job.
*
* @param value Response returned from `get`
* @return Stored event timestamp associated with the key. Returns `None` if
* the key is not present in Redis, or if timestamp information is
* unavailable on the stored value.
*/
def storedTimestamp(value: util.Map[Array[Byte], Array[Byte]]): Option[Timestamp]

}
Loading