Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

airframe-http: Add RPC call logging #1076

Merged
merged 3 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import com.twitter.finagle.http.{HeaderMap, Request, Response}
import com.twitter.finagle.{Service, SimpleFilter}
import com.twitter.util.Future
import wvlet.airframe.http.{HttpHeader, HttpStatus}
import wvlet.airframe.http.finagle.FinagleServer
import wvlet.airframe.http.finagle.filter.HttpAccessLogFilter.{HttpRequestLogger, _}
import wvlet.airframe.http.finagle.{FinagleBackend, FinagleServer}
import wvlet.airframe.http.{HttpHeader, HttpStatus}
import wvlet.airframe.surface.MethodSurface
import wvlet.log.LogTimestampFormatter

import scala.collection.immutable.ListMap
Expand Down Expand Up @@ -139,7 +140,10 @@ object HttpAccessLogFilter {
defaultErrorLogger
)

def defaultContextLoggers: Seq[HttpContextLogger] = Seq.empty
def defaultContextLoggers: Seq[HttpContextLogger] =
Seq(
defaultRPCLogger
)

def unixTimeLogger(request: Request): Map[String, Any] = {
val currentTimeMillis = System.currentTimeMillis()
Expand Down Expand Up @@ -200,6 +204,23 @@ object HttpAccessLogFilter {
m.result
}

def defaultRPCLogger(request: Request): Map[String, Any] = {
val m = ListMap.newBuilder[String, Any]
FinagleBackend.getThreadLocal("rpc").foreach { x: Any =>
x match {
case (methodSurface: MethodSurface, args: Seq[Any]) =>
m += "rpc_method" -> methodSurface.name
m += "rpc_class" -> methodSurface.owner.fullName
val rpcArgs = for ((p, arg) <- methodSurface.args.zip(args)) yield {
p.name -> arg
}
m += "rpc_args" -> rpcArgs.toMap
case _ =>
}
}
m.result()
}

import scala.jdk.CollectionConverters._
private val headerSanitizeCache = new ConcurrentHashMap[String, String]().asScala

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ object HttpAccessLogWriter {

override def write(log: Map[String, Any]): Unit = {
// Generate one-liner JSON log
// TODO: Handle too large log data (e.g., binary data)
val json = mapCodec.toJson(log)
asyncLogHandler.publish(new java.util.logging.LogRecord(Level.INFO, json))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* limitations under the License.
*/
package wvlet.airframe.http.finagle
import java.io.FileInputStream

import com.twitter.finagle.http.Request
import wvlet.airframe.Design
Expand All @@ -30,7 +29,7 @@ object HttpAccessLogTest extends AirSpec {

trait MyService {
@Endpoint(path = "/user/:id")
def user(id: String) = {
def user(id: Int) = {
s"hello user:${id}"
}
}
Expand Down Expand Up @@ -71,6 +70,11 @@ object HttpAccessLogTest extends AirSpec {
log.get("status_code_name") shouldBe Some(HttpStatus.Ok_200.reason)
// Custom headers
log.get("x_app_version") shouldBe Some("1.0")

// RPC logs
log.get("rpc_method") shouldBe Some("user")
log.get("rpc_class") shouldBe Some("wvlet.airframe.http.finagle.HttpAccessLogTest$MyService")
log.get("rpc_args") shouldBe Some(Map("id" -> 1))
}
}

Expand Down Expand Up @@ -109,6 +113,10 @@ object HttpAccessLogTest extends AirSpec {
log.get("response_time_ms") shouldBe defined
log.get("status_code") shouldBe Some(200)
log.get("status_code_name") shouldBe Some(HttpStatus.Ok_200.reason)

log.get("rpc_method") shouldBe Some("user")
log.get("rpc_class") shouldBe Some("wvlet.airframe.http.finagle.HttpAccessLogTest$MyService")
log.get("rpc_args") shouldBe Some(Map("id" -> 2))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,8 @@
package wvlet.airframe.http.router

import wvlet.airframe.codec.PrimitiveCodec.StringCodec
import wvlet.airframe.codec.{JSONCodec, MessageCodec, MessageCodecFactory}
import wvlet.airframe.http.{
HttpContext,
HttpMessage,
HttpMethod,
HttpMultiMap,
HttpMultiMapCodec,
HttpRequest,
HttpRequestAdapter
}
import wvlet.airframe.codec.{JSONCodec, MessageCodecFactory}
import wvlet.airframe.http._
import wvlet.airframe.json.JSON
import wvlet.airframe.msgpack.spi.MessagePack
import wvlet.airframe.surface.reflect.ReflectMethodSurface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ case class ControllerRoute(
try {
val methodArgs =
HttpRequestMapper.buildControllerMethodArgs(controller, methodSurface, request, context, params, codecFactory)

// Record RPC method arguments
context.setThreadLocal("rpc", (methodSurface, methodArgs))
methodSurface.call(controller, methodArgs: _*)
} catch {
case e: MessageCodecException[_] if e.errorCode == MISSING_PARAMETER =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import wvlet.airframe.http.example._
import wvlet.airframe.http.router.{ControllerProvider, RouteMatcher}
import wvlet.airframe.surface.Surface
import wvlet.airframe.codec.MessageCodecFactory
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airspec.AirSpec

/**
Expand Down Expand Up @@ -141,7 +142,9 @@ class RouterTest extends AirSpec {
val ret =
router
.findRoute(request)
.flatMap { m => m.call(session, serviceProvider, request, null, MessageCodecFactory.defaultFactory) }
.flatMap { m =>
m.call(session, serviceProvider, request, HttpContext.mockContext, MessageCodecFactory.defaultFactory)
}

ret shouldBe defined
ret.get shouldBe exepected
Expand Down
27 changes: 27 additions & 0 deletions airframe-http/src/main/scala/wvlet/airframe/http/HttpContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
* limitations under the License.
*/
package wvlet.airframe.http
import wvlet.airframe.http.HttpMessage.{Request, Response}

import scala.concurrent.Future
import scala.language.higherKinds

/***
Expand Down Expand Up @@ -86,4 +89,28 @@ object HttpContext {
}
}

/**
* Mock HttpContext for testing
*/
private[http] def mockContext: HttpContext[Request, Response, Future] = {
new HttpContext[Request, Response, Future] {
override protected def backend: HttpBackend[
Request,
Response,
Future
] = ???

/**
* Process the preceding filters and get the resulting Future[Response]
*/
override def apply(
request: Request
): Future[Response] = ???

override def setThreadLocal[A](key: String, value: A): Unit = {
// no-op
}
override def getThreadLocal[A](key: String): Option[A] = None
}
}
}