Skip to content

Commit

Permalink
[improve][cli] PIP-343: Use picocli instead of jcommander in bin/pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
nodece committed Mar 19, 2024
1 parent 4e0c145 commit 39bbbdb
Show file tree
Hide file tree
Showing 34 changed files with 691 additions and 715 deletions.
4 changes: 2 additions & 2 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>

<dependency>
Expand Down
135 changes: 74 additions & 61 deletions pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.create;
import static org.apache.pulsar.common.configuration.PulsarConfigurationLoader.isComplete;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FileInputStream;
Expand All @@ -34,10 +31,10 @@
import java.nio.file.Path;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
Expand All @@ -63,6 +60,10 @@
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.ScopeType;

public class PulsarBrokerStarter {

Expand All @@ -76,31 +77,31 @@ private static ServiceConfiguration loadConfig(String configFile) throws Excepti
}

@VisibleForTesting
@Parameters(commandDescription = "Options")
@Command(description = "Options", showDefaultValues = true, scope = ScopeType.INHERIT)
private static class StarterArguments {
@Parameter(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker")
@Option(names = {"-c", "--broker-conf"}, description = "Configuration file for Broker")
private String brokerConfigFile = "conf/broker.conf";

@Parameter(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with Broker")
@Option(names = {"-rb", "--run-bookie"}, description = "Run Bookie together with Broker")
private boolean runBookie = false;

@Parameter(names = {"-ra", "--run-bookie-autorecovery"},
@Option(names = {"-ra", "--run-bookie-autorecovery"},
description = "Run Bookie Autorecovery together with broker")
private boolean runBookieAutoRecovery = false;

@Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
@Option(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
private String bookieConfigFile = "conf/bookkeeper.conf";

@Parameter(names = {"-rfw", "--run-functions-worker"}, description = "Run functions worker with Broker")
@Option(names = {"-rfw", "--run-functions-worker"}, description = "Run functions worker with Broker")
private boolean runFunctionsWorker = false;

@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
@Option(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
private String fnWorkerConfigFile = "conf/functions_worker.yml";

@Parameter(names = {"-h", "--help"}, description = "Show this help message")
@Option(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;

@Parameter(names = {"-g", "--generate-docs"}, description = "Generate docs")
@Option(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;
}

Expand All @@ -125,43 +126,47 @@ private static ServerConfiguration readBookieConfFile(String bookieConfigFile) t
return bookieConf;
}

private static boolean argsContains(String[] args, String arg) {
return Arrays.asList(args).contains(arg);
}

private static class BrokerStarter {
private final ServiceConfiguration brokerConfig;
private final PulsarService pulsarService;
private final LifecycleComponent bookieServer;
protected static class BrokerStarter implements Callable<Integer> {
private ServiceConfiguration brokerConfig;
private PulsarService pulsarService;
private LifecycleComponent bookieServer;
private volatile CompletableFuture<Void> bookieStartFuture;
private final AutoRecoveryMain autoRecoveryMain;
private final StatsProvider bookieStatsProvider;
private final ServerConfiguration bookieConfig;
private final WorkerService functionsWorkerService;
private final WorkerConfig workerConfig;

BrokerStarter(String[] args) throws Exception {
StarterArguments starterArguments = new StarterArguments();
JCommander jcommander = new JCommander(starterArguments);
jcommander.setProgramName("PulsarBrokerStarter");

// parse args by JCommander
jcommander.parse(args);
private AutoRecoveryMain autoRecoveryMain;
private StatsProvider bookieStatsProvider;
private ServerConfiguration bookieConfig;
private WorkerService functionsWorkerService;
private WorkerConfig workerConfig;

private CommandLine commander;

private final StarterArguments starterArguments;

BrokerStarter() {
starterArguments = new StarterArguments();
commander = new CommandLine(starterArguments);
commander.setCommandName("PulsarBrokerStarter");
}

public int start(String[] args) {
return commander.execute(args);
}

public Integer call() throws Exception {
if (starterArguments.help) {
jcommander.usage();
System.exit(0);
commander.usage(commander.getOut());
return 0;
}

if (starterArguments.generateDocs) {
CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
cmd.addCommand("broker", starterArguments);
cmd.addCommand("broker", commander);
cmd.run(null);
System.exit(0);
return 0;
}

// init broker config
if (isBlank(starterArguments.brokerConfigFile)) {
jcommander.usage();
commander.usage(commander.getOut());
throw new IllegalArgumentException("Need to specify a configuration file for broker");
} else {
final String filepath = Path.of(starterArguments.brokerConfigFile)
Expand Down Expand Up @@ -209,20 +214,16 @@ private static class BrokerStarter {
});

// if no argument to run bookie in cmd line, read from pulsar config
if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
checkState(!starterArguments.runBookie,
"runBookie should be false if has no argument specified");
if (!starterArguments.runBookie) {
starterArguments.runBookie = brokerConfig.isEnableRunBookieTogether();
}
if (!argsContains(args, "-ra") && !argsContains(args, "--run-bookie-autorecovery")) {
checkState(!starterArguments.runBookieAutoRecovery,
"runBookieAutoRecovery should be false if has no argument specified");
if (!starterArguments.runBookieAutoRecovery) {
starterArguments.runBookieAutoRecovery = brokerConfig.isEnableRunBookieAutoRecoveryTogether();
}

if ((starterArguments.runBookie || starterArguments.runBookieAutoRecovery)
&& isBlank(starterArguments.bookieConfigFile)) {
jcommander.usage();
&& isBlank(starterArguments.bookieConfigFile)) {
commander.usage(commander.getOut());
throw new IllegalArgumentException("No configuration file for Bookie");
}

Expand Down Expand Up @@ -257,9 +258,7 @@ && isBlank(starterArguments.bookieConfigFile)) {
} else {
autoRecoveryMain = null;
}
}

public void start() throws Exception {
if (bookieStatsProvider != null) {
bookieStatsProvider.start(bookieConfig);
log.info("started bookieStatsProvider.");
Expand All @@ -275,15 +274,17 @@ public void start() throws Exception {

pulsarService.start();
log.info("PulsarService started.");
return 0;
}

public void join() throws InterruptedException {
pulsarService.waitUntilClosed();

try {
pulsarService.close();
} catch (PulsarServerException e) {
throw new RuntimeException();
if (pulsarService != null) {
pulsarService.waitUntilClosed();
try {
pulsarService.close();
} catch (PulsarServerException e) {
throw new RuntimeException();
}
}

if (bookieStartFuture != null) {
Expand All @@ -301,8 +302,10 @@ public void shutdown() throws Exception {
log.info("Shut down functions worker service successfully.");
}

pulsarService.close();
log.info("Shut down broker service successfully.");
if (pulsarService != null) {
pulsarService.close();
log.info("Shut down broker service successfully.");
}

if (bookieStatsProvider != null) {
bookieStatsProvider.stop();
Expand All @@ -317,6 +320,11 @@ public void shutdown() throws Exception {
log.info("Shut down autoRecoveryMain successfully.");
}
}

@VisibleForTesting
CommandLine getCommander() {
return commander;
}
}


Expand All @@ -330,7 +338,7 @@ public static void main(String[] args) throws Exception {
exception.printStackTrace(System.out);
});

BrokerStarter starter = new BrokerStarter(args);
BrokerStarter starter = new BrokerStarter();
Runtime.getRuntime().addShutdownHook(
new Thread(() -> {
try {
Expand All @@ -344,16 +352,21 @@ public static void main(String[] args) throws Exception {
);

PulsarByteBufAllocator.registerOOMListener(oomException -> {
if (starter.brokerConfig.isSkipBrokerShutdownOnOOM()) {
if (starter.brokerConfig != null && starter.brokerConfig.isSkipBrokerShutdownOnOOM()) {
log.error("-- Received OOM exception: {}", oomException.getMessage(), oomException);
} else {
log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException);
starter.pulsarService.shutdownNow();
if (starter.pulsarService != null) {
starter.pulsarService.shutdownNow();
}
}
});

try {
starter.start();
int start = starter.start(args);
if (start != 0) {
System.exit(start);
}
} catch (Throwable t) {
log.error("Failed to start pulsar service.", t);
ShutdownUtil.triggerImmediateForcefulShutdown();
Expand Down
Loading

0 comments on commit 39bbbdb

Please # to comment.