Skip to content

[TEST][NO-MERGE] Stress test domain sockets #382

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package protocbridge.frontend

import org.newsclub.net.unix.AFUNIXServerSocket
import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.{Files, Path}
import java.{util => ju}

/** PluginFrontend for macOS.
*
* Creates a server socket and uses `nc` to communicate with the socket. We use
* a server socket instead of named pipes because named pipes are unreliable on
* macOS: https://github.com/scalapb/protoc-bridge/issues/366
*
* Since `nc` is widely available on macOS, this is the simplest alternative
* for macOS. However, raw `nc` is also not very reliable on macOS:
* https://github.com/scalapb/protoc-bridge/issues/379
*
* The most reliable way to communicate is found to be with a domain socket and
* a server-side read timeout, which are implemented here.
*/
object MacPluginFrontend extends SocketBasedPluginFrontend {
case class InternalState(
shellScript: Path,
tempDirPath: Path,
socketPath: Path,
serverSocket: ServerSocket
)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val tempDirPath = Files.createTempDirectory("protocbridge")
val socketPath = tempDirPath.resolve("socket")
val serverSocket = AFUNIXServerSocket.bindOn(socketPath, true)
val sh = createShellScript(socketPath)

runWithSocket(plugin, env, serverSocket)

(sh, InternalState(sh, tempDirPath, socketPath, serverSocket))
}

override def cleanup(state: InternalState): Unit = {
state.serverSocket.close()
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.tempDirPath)
Files.delete(state.shellScript)
}
}

private def createShellScript(socketPath: Path): Path = {
val shell = sys.env.getOrElse("PROTOCBRIDGE_SHELL", "/bin/sh")
val scriptName = PluginFrontend.createTempFile(
"",
s"""|#!$shell
|set -e
|nc -U "$socketPath"
""".stripMargin
)
val perms = new ju.HashSet[PosixFilePermission]
perms.add(PosixFilePermission.OWNER_EXECUTE)
perms.add(PosixFilePermission.OWNER_READ)
Files.setPosixFilePermissions(
scriptName,
perms
)
scriptName
}
}
25 changes: 15 additions & 10 deletions bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import java.nio.file.{Files, Path}

import protocbridge.{ProtocCodeGenerator, ExtraEnv}

import scala.util.Try

/** A PluginFrontend instance provides a platform-dependent way for protoc to
* communicate with a JVM based ProtocCodeGenerator.
*
Expand Down Expand Up @@ -47,13 +45,7 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
request: Array[Byte]
): Array[Byte] = {
Try {
gen.run(request)
}.recover { case throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}.get
gen.run(request)
}

def createCodeGeneratorResponseWithError(error: String): Array[Byte] = {
Expand Down Expand Up @@ -116,9 +108,17 @@ object PluginFrontend {
gen: ProtocCodeGenerator,
fsin: InputStream,
env: ExtraEnv
): Array[Byte] = {
): Array[Byte] = try {
val bytes = readInputStreamToByteArrayWithEnv(fsin, env)
runWithBytes(gen, bytes)
} catch {
// This covers all Throwable including OutOfMemoryError, StackOverflowError, etc.
// We need to make a best effort to return a response to protoc,
// otherwise protoc can hang indefinitely.
case throwable: Throwable =>
createCodeGeneratorResponseWithError(
throwable.toString + "\n" + getStackTrace(throwable)
)
}

def createTempFile(extension: String, content: String): Path = {
Expand All @@ -131,8 +131,13 @@ object PluginFrontend {

def isWindows: Boolean = sys.props("os.name").startsWith("Windows")

def isMac: Boolean = sys.props("os.name").startsWith("Mac") || sys
.props("os.name")
.startsWith("Darwin")

def newInstance: PluginFrontend = {
if (isWindows) WindowsPluginFrontend
else if (isMac) MacPluginFrontend
else PosixPluginFrontend
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.sys.process._
import java.{util => ju}

/** PluginFrontend for Unix-like systems (Linux, Mac, etc)
/** PluginFrontend for Unix-like systems <b>except macOS</b> (Linux, FreeBSD,
* etc)
*
* Creates a pair of named pipes for input/output and a shell script that
* communicates with them.
* communicates with them. Compared with `SocketBasedPluginFrontend`, this
* frontend doesn't rely on `nc` that might not be available in some
* distributions.
*/
object PosixPluginFrontend extends PluginFrontend {
case class InternalState(
Expand All @@ -40,6 +43,11 @@ object PosixPluginFrontend extends PluginFrontend {
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
fsin.close()

// Note that the output pipe must be opened after the input pipe is consumed.
// Otherwise, there might be a deadlock that
// - The shell script is stuck writing to the input pipe (which has a full buffer),
// and doesn't open the write end of the output pipe.
// - This thread is stuck waiting for the write end of the output pipe to be opened.
val fsout = Files.newOutputStream(outputPipe)
fsout.write(response)
fsout.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package protocbridge.frontend

import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, blocking}

/** PluginFrontend for Windows and macOS where a server socket is used.
*/
abstract class SocketBasedPluginFrontend extends PluginFrontend {

protected def runWithSocket(
plugin: ProtocCodeGenerator,
env: ExtraEnv,
serverSocket: ServerSocket
): Unit = {
Future {
blocking {
// Accept a single client connection from the shell script.
val client = serverSocket.accept()
// It's found on macOS that a `junixsocket` domain socket server
// might not receive the EOF sent by the other end, leading to a hang:
// https://github.com/scalapb/protoc-bridge/issues/379
// However, confusingly, adding an arbitrary read timeout resolves the issue.
// We thus add a read timeout of 1 minute here, which should be more than enough.
// It also helps to prevent an infinite hang on both Windows and macOS due to
// unexpected issues.
// client.setSoTimeout(60000)
try {
val response =
PluginFrontend.runWithInputStream(
plugin,
client.getInputStream,
env
)
client.getOutputStream.write(response)
} finally {
client.close()
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,38 @@
package protocbridge.frontend

import protocbridge.{ExtraEnv, ProtocCodeGenerator}

import java.net.ServerSocket
import java.nio.file.{Files, Path, Paths}

import protocbridge.ExtraEnv
import protocbridge.ProtocCodeGenerator

import scala.concurrent.blocking

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

/** A PluginFrontend that binds a server socket to a local interface. The plugin
* is a batch script that invokes BridgeApp.main() method, in a new JVM with
* the same parameters as the currently running JVM. The plugin will
* communicate its stdin and stdout to this socket.
*/
object WindowsPluginFrontend extends PluginFrontend {

case class InternalState(batFile: Path)
object WindowsPluginFrontend extends SocketBasedPluginFrontend {
case class InternalState(shellScript: Path, serverSocket: ServerSocket)

override def prepare(
plugin: ProtocCodeGenerator,
env: ExtraEnv
): (Path, InternalState) = {
val ss = new ServerSocket(0)
val state = createWindowsScript(ss.getLocalPort)
val ss = new ServerSocket(0) // Bind to any available port.
val sh = createShellScript(ss.getLocalPort)

Future {
blocking {
val client = ss.accept()
val response =
PluginFrontend.runWithInputStream(plugin, client.getInputStream, env)
client.getOutputStream.write(response)
client.close()
ss.close()
}
}
runWithSocket(plugin, env, ss)

(state.batFile, state)
(sh, InternalState(sh, ss))
}

override def cleanup(state: InternalState): Unit = {
state.serverSocket.close()
if (sys.props.get("protocbridge.debug") != Some("1")) {
Files.delete(state.batFile)
Files.delete(state.shellScript)
}
}

private def createWindowsScript(port: Int): InternalState = {
private def createShellScript(port: Int): Path = {
val classPath =
Paths.get(getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
val classPathBatchString = classPath.toString.replace("%", "%%")
Expand All @@ -62,6 +47,6 @@ object WindowsPluginFrontend extends PluginFrontend {
].getName} $port
""".stripMargin
)
InternalState(batchFile)
batchFile
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package protocbridge.frontend

class MacPluginFrontendSpec extends OsSpecificFrontendSpec {
if (PluginFrontend.isMac) {
it must "execute a program that forwards input and output to given stream" in {
val state = testSuccess(MacPluginFrontend)
state.serverSocket.isClosed mustBe true
}

it must "not hang if there is an error in generator" in {
val state = testFailure(MacPluginFrontend)
state.serverSocket.isClosed mustBe true
}
}
}
Loading