Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[CAY-1207] Decouple Pregel framework from pagerank app #1212

Merged
merged 15 commits into from
Jul 21, 2017
Merged
4 changes: 2 additions & 2 deletions pregel/bin/run_pregel.sh → pregel/bin/run_pagerank.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

# EXAMPLE USAGE
# ./run_pregel.sh -input_path inputs/adj_list
# ./run_pagerank.sh -input_path file://$(pwd)/inputs/adj_list

SELF_JAR=`echo ../target/pregel-*-shaded.jar`

Expand All @@ -24,7 +24,7 @@ CLASSPATH=$YARN_HOME/share/hadoop/common/*:$YARN_HOME/share/hadoop/common/lib/*:

YARN_CONF_DIR=$YARN_HOME/etc/hadoop

JOB=edu.snu.cay.pregel.PregelLauncher
JOB=edu.snu.cay.pregel.graphapps.pagerank.PagerankET

CMD="java -cp $YARN_CONF_DIR:$SELF_JAR:$CLASSPATH $LOGGING_CONFIG $JOB $*"
echo $CMD
Expand Down
122 changes: 122 additions & 0 deletions pregel/src/main/java/edu/snu/cay/pregel/PregelConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright (C) 2017 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.cay.pregel;

import edu.snu.cay.pregel.common.DefaultVertexCodec;
import edu.snu.cay.pregel.graph.api.Computation;
import edu.snu.cay.services.et.evaluator.api.DataParser;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.io.serialization.Codec;
import org.apache.reef.tang.annotations.Name;

import java.util.ArrayList;
import java.util.List;

/**
* Job configuration of a Pregel on ET application.
*
* Call {@code newBuilder} and supply classes for {@link Computation},
* {@link DataParser} and {@link Codec}s.
* Use with {@link PregelLauncher#launch(String[], PregelConfiguration)} to launch application.
*/
@ClientSide
public final class PregelConfiguration {

private final Class<? extends Computation> computationClass;

private final Class<? extends Codec> vertexCodecClass;
private final Class<? extends DataParser> dataParserClass;

private final Class<? extends Codec> messageCodecClass;
private final List<Class<? extends Name<?>>> userParamList;

private PregelConfiguration(final Class<? extends Computation> computationClass,
final Class<? extends Codec> vertexCodecClass,
final Class<? extends DataParser> dataParserClass,
final Class<? extends Codec> messageCodecClass,
final List<Class<? extends Name<?>>> userParamList) {
this.computationClass = computationClass;
this.vertexCodecClass = vertexCodecClass;
this.dataParserClass = dataParserClass;
this.messageCodecClass = messageCodecClass;
this.userParamList = userParamList;
}

public Class<? extends Computation> getComputationClass() {
return computationClass;
}

public Class<? extends Codec> getVertexCodecClass() {
return vertexCodecClass;
}

public Class<? extends DataParser> getDataParserClass() {
return dataParserClass;
}

public Class<? extends Codec> getMessageCodecClass() {
return messageCodecClass;
}

public List<Class<? extends Name<?>>> getUserParamList() {
return userParamList;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder implements org.apache.reef.util.Builder<PregelConfiguration> {

private Class<? extends Computation> computationClass;
private Class<? extends Codec> vertexCodecClass = DefaultVertexCodec.class;
private Class<? extends DataParser> dataParserClass;

private Class<? extends Codec> messageCodecClass;
private List<Class<? extends Name<?>>> userParamList = new ArrayList<>();

public Builder setComputationClass(final Class<? extends Computation> computationClass) {
this.computationClass = computationClass;
return this;
}

public Builder setVertexCodecClass(final Class<? extends Codec> vertexCodecClass) {
this.vertexCodecClass = vertexCodecClass;
return this;
}

public Builder setDataParserClass(final Class<? extends DataParser> dataParserClass) {
this.dataParserClass = dataParserClass;
return this;
}

public Builder setMessageCodecClass(final Class<? extends Codec> messageCodecClass) {
this.messageCodecClass = messageCodecClass;
return this;
}

public Builder addParameterClass(final Class<? extends Name<?>> parameterClass) {
userParamList.add(parameterClass);
return this;
}

@Override
public PregelConfiguration build() {
return new PregelConfiguration(computationClass, vertexCodecClass, dataParserClass,
messageCodecClass, userParamList);
}
}
}
54 changes: 37 additions & 17 deletions pregel/src/main/java/edu/snu/cay/pregel/PregelDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package edu.snu.cay.pregel;

import edu.snu.cay.common.centcomm.master.CentCommConfProvider;
import edu.snu.cay.pregel.common.AddDoubleUpdateFunction;
import edu.snu.cay.pregel.common.DoubleMsgCodec;
import edu.snu.cay.pregel.common.NoneEdgeValueGraphParser;
import edu.snu.cay.pregel.common.VertexCodec;
import edu.snu.cay.pregel.common.DefaultVertexCodec;
import edu.snu.cay.pregel.PregelParameters.*;
import edu.snu.cay.pregel.common.MessageUpdateFunction;
import edu.snu.cay.services.et.configuration.ExecutorConfiguration;
import edu.snu.cay.services.et.configuration.RemoteAccessConfiguration;
import edu.snu.cay.services.et.configuration.ResourceConfiguration;
Expand All @@ -28,17 +28,25 @@
import edu.snu.cay.services.et.driver.api.ETMaster;
import edu.snu.cay.services.et.driver.impl.AllocatedTable;
import edu.snu.cay.services.et.driver.impl.SubmittedTask;
import edu.snu.cay.services.et.evaluator.api.DataParser;
import edu.snu.cay.services.et.evaluator.impl.ExistKeyBulkDataLoader;
import edu.snu.cay.services.et.evaluator.impl.VoidUpdateFunction;
import edu.snu.cay.utils.ConfigurationUtils;
import org.apache.reef.driver.task.TaskConfiguration;
import org.apache.reef.io.serialization.Codec;
import org.apache.reef.io.serialization.SerializableCodec;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.time.event.StartTime;

import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -63,16 +71,17 @@ public final class PregelDriver {

private final ETMaster etMaster;
private final ExecutorConfiguration executorConf;
private final String tableInputPath;
private final AtomicInteger workerCounter = new AtomicInteger(0);
private final Injector masterConfInjector;
private final Configuration taskConf;

@Inject
private PregelDriver(final ETMaster etMaster,
final CentCommConfProvider centCommConfProvider,
final PregelMaster pregelMaster,
@Parameter(PregelLauncher.InputPath.class) final String tableInputPath) {
@Parameter(SerializedTaskConf.class) final String serializedTaskConf,
@Parameter(SerializedMasterConf.class) final String serializedMasterConf) throws IOException {
this.etMaster = etMaster;
this.tableInputPath = tableInputPath;
this.executorConf = ExecutorConfiguration.newBuilder()
.setResourceConf(ResourceConfiguration.newBuilder()
.setNumCores(1)
Expand All @@ -87,6 +96,9 @@ private PregelDriver(final ETMaster etMaster,
.setUserContextConf(centCommConfProvider.getContextConfiguration())
.setUserServiceConf(centCommConfProvider.getServiceConfWithoutNameResolver())
.build();

this.masterConfInjector = Tang.Factory.getTang().newInjector(ConfigurationUtils.fromString(serializedMasterConf));
this.taskConf = ConfigurationUtils.fromString(serializedTaskConf);
}

public final class StartHandler implements EventHandler<StartTime> {
Expand All @@ -109,7 +121,7 @@ public void onNext(final StartTime startTime) {
final AllocatedTable vertexTable = etMaster.createTable(
buildVertexTableConf(VERTEX_TABLE_ID), executors).get();

vertexTable.load(executors, tableInputPath).get();
vertexTable.load(executors, masterConfInjector.getNamedInstance(InputPath.class)).get();

final List<Future<SubmittedTask>> taskFutureList = new ArrayList<>();
executors.forEach(executor -> taskFutureList.add(executor.submitTask(buildTaskConf())));
Expand All @@ -119,37 +131,42 @@ public void onNext(final StartTime startTime) {
}
executors.forEach(AllocatedExecutor::close);

} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException | ExecutionException | InjectionException e) {
throw new RuntimeException(e);
}
}).start();
}
}

private Configuration buildTaskConf() {
return TaskConfiguration.CONF
return Configurations.merge(taskConf, TaskConfiguration.CONF
.set(TaskConfiguration.IDENTIFIER, WORKER_PREFIX + workerCounter.getAndIncrement())
.set(TaskConfiguration.TASK, PregelWorkerTask.class)
.build();
.build());
}

/**
* Build a configuration of vertex table.
* Type of value is {@link edu.snu.cay.pregel.graph.api.Vertex} so set {@link VertexCodec} to value codec class.
* Type of value is {@link edu.snu.cay.pregel.graph.api.Vertex}
* so set {@link DefaultVertexCodec} to value codec class.
* Note that this configuration is for Pagerank app.
*
* @param tableId an identifier of {@link TableConfiguration}
*/
private TableConfiguration buildVertexTableConf(final String tableId) {
private TableConfiguration buildVertexTableConf(final String tableId) throws InjectionException {

final Codec vertexCodec = masterConfInjector.getNamedInstance(VertexCodec.class);
final DataParser dataParser = masterConfInjector.getInstance(DataParser.class);

return TableConfiguration.newBuilder()
.setId(tableId)
.setKeyCodecClass(SerializableCodec.class)
.setValueCodecClass(VertexCodec.class)
.setValueCodecClass(vertexCodec.getClass())
.setUpdateValueCodecClass(SerializableCodec.class)
.setUpdateFunctionClass(VoidUpdateFunction.class)
.setIsMutableTable(true)
.setIsOrderedTable(false)
.setDataParserClass(NoneEdgeValueGraphParser.class)
.setDataParserClass(dataParser.getClass())
.setBulkDataLoaderClass(ExistKeyBulkDataLoader.class)
.build();
}
Expand All @@ -161,13 +178,16 @@ private TableConfiguration buildVertexTableConf(final String tableId) {
*
* @param tableId an identifier of {@link TableConfiguration}
*/
private TableConfiguration buildMsgTableConf(final String tableId) {
private TableConfiguration buildMsgTableConf(final String tableId) throws InjectionException {

final Codec messageCodec = masterConfInjector.getNamedInstance(MessageCodec.class);

return TableConfiguration.newBuilder()
.setId(tableId)
.setKeyCodecClass(SerializableCodec.class)
.setValueCodecClass(DoubleMsgCodec.class)
.setValueCodecClass(messageCodec.getClass())
.setUpdateValueCodecClass(SerializableCodec.class)
.setUpdateFunctionClass(AddDoubleUpdateFunction.class)
.setUpdateFunctionClass(MessageUpdateFunction.class)
.setIsMutableTable(true)
.setIsOrderedTable(false)
.build();
Expand Down
Loading