-
Notifications
You must be signed in to change notification settings - Fork 527
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #521 from mesos/gk/fix_520
Fixes #520 - fetch the frameworkId from the state store on MesosDriver creation
- Loading branch information
Showing
8 changed files
with
172 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
72 changes: 19 additions & 53 deletions
72
src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosDriverFactory.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,93 +1,59 @@ | ||
package org.apache.mesos.chronos.scheduler.mesos | ||
|
||
import java.io.{ FileInputStream, IOException } | ||
import java.nio.file.attribute.PosixFilePermission | ||
import java.nio.file.{ Files, Paths } | ||
import java.util.logging.Logger | ||
|
||
import com.google.protobuf.ByteString | ||
import org.apache.mesos.Protos.{ Credential, FrameworkInfo, Status } | ||
import mesosphere.chaos.http.HttpConf | ||
import mesosphere.mesos.util.FrameworkIdUtil | ||
import org.apache.mesos.Protos.{ FrameworkID, Status } | ||
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration | ||
import org.apache.mesos.{ MesosSchedulerDriver, Scheduler, SchedulerDriver } | ||
|
||
import scala.collection.JavaConverters.asScalaSetConverter | ||
import org.apache.mesos.{ Scheduler, SchedulerDriver } | ||
|
||
/** | ||
* The chronos driver doesn't allow calling the start() method after stop() has been called, thus we need a factory to | ||
* create a new driver once we call stop() - which will be called if the leader abdicates or is no longer a leader. | ||
* @author Florian Leibert (flo@leibert.de) | ||
*/ | ||
class MesosDriverFactory(val mesosScheduler: Scheduler, val frameworkInfo: FrameworkInfo, val config: SchedulerConfiguration) { | ||
class MesosDriverFactory( | ||
scheduler: Scheduler, | ||
frameworkIdUtil: FrameworkIdUtil, | ||
config: SchedulerConfiguration with HttpConf, | ||
schedulerDriverBuilder: SchedulerDriverBuilder = new SchedulerDriverBuilder) { | ||
|
||
private[this] val log = Logger.getLogger(getClass.getName) | ||
|
||
var mesosDriver: Option[SchedulerDriver] = None | ||
|
||
def start() { | ||
def start(): Unit = { | ||
val status = get().start() | ||
if (status != Status.DRIVER_RUNNING) { | ||
log.severe(s"MesosSchedulerDriver start resulted in status:$status. Committing suicide!") | ||
log.severe(s"MesosSchedulerDriver start resulted in status: $status. Committing suicide!") | ||
System.exit(1) | ||
} | ||
} | ||
|
||
def get(): SchedulerDriver = { | ||
if (mesosDriver.isEmpty) { | ||
makeDriver() | ||
mesosDriver = Some(makeDriver()) | ||
} | ||
mesosDriver.get | ||
} | ||
|
||
def makeDriver() { | ||
|
||
val driver = config.mesosAuthenticationPrincipal.get match { | ||
case Some(principal) => | ||
val credential = buildMesosCredentials(principal, config.mesosAuthenticationSecretFile.get) | ||
new MesosSchedulerDriver(mesosScheduler, frameworkInfo, config.master(), credential) | ||
case None => | ||
new MesosSchedulerDriver(mesosScheduler, frameworkInfo, config.master()) | ||
} | ||
|
||
mesosDriver = Option(driver) | ||
} | ||
|
||
def close() { | ||
def close(): Unit = { | ||
assert(mesosDriver.nonEmpty, "Attempted to close a non initialized driver") | ||
if (mesosDriver.isEmpty) { | ||
log.severe("Attempted to close a non initialized driver") | ||
System.exit(1) | ||
} | ||
|
||
mesosDriver.get.stop(true) | ||
mesosDriver = None | ||
} | ||
|
||
private[this] def makeDriver(): SchedulerDriver = { | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import mesosphere.util.BackToTheFuture.Implicits.defaultTimeout | ||
|
||
/** | ||
* Create the optional credentials instance, used to authenticate calls from Chronos to Mesos. | ||
*/ | ||
def buildMesosCredentials(principal: String, secretFile: Option[String]): Credential = { | ||
|
||
val credentialBuilder = Credential.newBuilder() | ||
.setPrincipal(principal) | ||
|
||
secretFile foreach { file => | ||
try { | ||
val secretBytes = ByteString.readFrom(new FileInputStream(file)) | ||
|
||
val filePermissions = Files.getPosixFilePermissions(Paths.get(file)).asScala | ||
if (!(filePermissions & Set(PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_WRITE)).isEmpty) | ||
log.warning(s"Secret file $file should not be globally accessible.") | ||
|
||
credentialBuilder.setSecret(secretBytes) | ||
} | ||
catch { | ||
case cause: Throwable => | ||
throw new IOException(s"Error reading authentication secret from file [$file]", cause) | ||
} | ||
} | ||
|
||
credentialBuilder.build() | ||
val maybeFrameworkID: Option[FrameworkID] = frameworkIdUtil.fetch | ||
schedulerDriverBuilder.newDriver(config, maybeFrameworkID, scheduler) | ||
} | ||
|
||
|
||
} |
88 changes: 88 additions & 0 deletions
88
src/main/scala/org/apache/mesos/chronos/scheduler/mesos/SchedulerDriverBuilder.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package org.apache.mesos.chronos.scheduler.mesos | ||
|
||
import java.io.{ IOException, FileInputStream } | ||
import java.nio.file.attribute.PosixFilePermission | ||
import java.nio.file.{ Paths, Files } | ||
import java.util.logging.Logger | ||
|
||
import com.google.protobuf.ByteString | ||
import mesosphere.chaos.http.HttpConf | ||
import org.apache.mesos.Protos.{ Credential, FrameworkID, FrameworkInfo } | ||
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration | ||
import org.apache.mesos.{ Protos, MesosSchedulerDriver, Scheduler, SchedulerDriver } | ||
|
||
import scala.collection.JavaConverters.asScalaSetConverter | ||
|
||
class SchedulerDriverBuilder { | ||
private[this] val log = Logger.getLogger(getClass.getName) | ||
|
||
def newDriver(config: SchedulerConfiguration with HttpConf, | ||
frameworkId: Option[FrameworkID], | ||
scheduler: Scheduler): SchedulerDriver = { | ||
def buildCredentials(principal: String, secretFile: String): Credential = { | ||
val credentialBuilder = Credential.newBuilder().setPrincipal(principal) | ||
|
||
try { | ||
val secretBytes = ByteString.readFrom(new FileInputStream(secretFile)) | ||
|
||
val filePermissions = Files.getPosixFilePermissions(Paths.get(secretFile)).asScala | ||
if ((filePermissions & Set(PosixFilePermission.OTHERS_READ, PosixFilePermission.OTHERS_WRITE)).nonEmpty) | ||
log.warning(s"Secret file $secretFile should not be globally accessible.") | ||
|
||
credentialBuilder.setSecret(secretBytes) | ||
} | ||
catch { | ||
case cause: Throwable => | ||
throw new IOException(s"Error reading authentication secret from file [$secretFile]", cause) | ||
} | ||
|
||
credentialBuilder.build() | ||
} | ||
|
||
def buildFrameworkInfo(config: SchedulerConfiguration with HttpConf, | ||
frameworkId: Option[FrameworkID]): Protos.FrameworkInfo = { | ||
val frameworkInfoBuilder = FrameworkInfo.newBuilder() | ||
.setName(config.mesosFrameworkName()) | ||
.setCheckpoint(config.mesosCheckpoint()) | ||
.setRole(config.mesosRole()) | ||
.setFailoverTimeout(config.failoverTimeoutSeconds()) | ||
.setUser(config.user()) | ||
|
||
// Set the ID, if provided | ||
frameworkId.foreach(frameworkInfoBuilder.setId) | ||
|
||
if (config.webuiUrl.isSupplied) { | ||
frameworkInfoBuilder.setWebuiUrl(config.webuiUrl()) | ||
} | ||
else if (config.sslKeystorePath.isDefined) { | ||
// ssl enabled, use https | ||
frameworkInfoBuilder.setWebuiUrl(s"https://${config.hostname()}:${config.httpsPort()}") | ||
} | ||
else { | ||
// ssl disabled, use http | ||
frameworkInfoBuilder.setWebuiUrl(s"http://${config.hostname()}:${config.httpPort()}") | ||
} | ||
|
||
// set the authentication principal, if provided | ||
config.mesosAuthenticationPrincipal.get.foreach(frameworkInfoBuilder.setPrincipal) | ||
|
||
frameworkInfoBuilder.build() | ||
} | ||
|
||
val frameworkInfo: FrameworkInfo = buildFrameworkInfo(config, frameworkId) | ||
|
||
val credential: Option[Credential] = config.mesosAuthenticationPrincipal.get.flatMap { principal => | ||
config.mesosAuthenticationSecretFile.get.map { secretFile => | ||
buildCredentials(principal, secretFile) | ||
} | ||
} | ||
|
||
credential match { | ||
case Some(cred) => | ||
new MesosSchedulerDriver(scheduler, frameworkInfo, config.master(), cred) | ||
|
||
case None => | ||
new MesosSchedulerDriver(scheduler, frameworkInfo, config.master()) | ||
} | ||
} | ||
} |
16 changes: 16 additions & 0 deletions
16
src/test/scala/org/apache/mesos/chronos/ChronosTestHelper.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package org.apache.mesos.chronos | ||
|
||
import mesosphere.chaos.http.HttpConf | ||
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration | ||
import org.rogach.scallop.ScallopConf | ||
|
||
object ChronosTestHelper { | ||
def makeConfig(args: String*): SchedulerConfiguration with HttpConf = { | ||
val opts = new ScallopConf(args) with SchedulerConfiguration with HttpConf { | ||
// scallop will trigger sys exit | ||
override protected def onError(e: Throwable): Unit = throw e | ||
} | ||
opts.afterInit() | ||
opts | ||
} | ||
} |
21 changes: 5 additions & 16 deletions
21
src/test/scala/org/apache/mesos/chronos/scheduler/jobs/TaskManagerSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
src/test/scala/org/apache/mesos/chronos/scheduler/mesos/MesosDriverFactorySpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package org.apache.mesos.chronos.scheduler.mesos | ||
|
||
import mesosphere.mesos.util.FrameworkIdUtil | ||
import org.apache.mesos.Scheduler | ||
import org.apache.mesos.chronos.ChronosTestHelper._ | ||
import org.specs2.mock.Mockito | ||
import org.specs2.mutable.SpecificationWithJUnit | ||
|
||
class MesosDriverFactorySpec extends SpecificationWithJUnit with Mockito { | ||
"MesosDriverFactorySpec" should { | ||
"always fetch the frameworkId from the state store before creating a driver" in { | ||
val scheduler: Scheduler = mock[Scheduler] | ||
val frameworkIdUtil: FrameworkIdUtil = mock[FrameworkIdUtil] | ||
val mesosDriverFactory = new MesosDriverFactory( | ||
scheduler, | ||
frameworkIdUtil, | ||
makeConfig(), | ||
mock[SchedulerDriverBuilder]) | ||
|
||
frameworkIdUtil.fetch(any, any).returns(None) | ||
|
||
mesosDriverFactory.get() | ||
|
||
there was one(frameworkIdUtil).fetch(any, any) | ||
ok | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.