Skip to content

Commit

Permalink
Merge pull request #48 from dmikurube/catch-up-v0.10.6
Browse files Browse the repository at this point in the history
Catch up with Embulk v0.10.6
  • Loading branch information
civitaspo authored Aug 13, 2020
2 parents 542b4db + dfea900 commit 833e148
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 47 deletions.
42 changes: 36 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
id "java"
id "checkstyle"
id "maven-publish"
id "org.embulk.embulk-plugins" version "0.4.1"
id "org.embulk.embulk-plugins" version "0.4.2"
}
repositories {
mavenCentral()
Expand All @@ -19,18 +19,40 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compileOnly("org.embulk:embulk-core:0.9.23")
compileOnly("org.embulk:embulk-api:0.10.6")
compileOnly("org.embulk:embulk-core:0.10.6")
compile("com.jayway.jsonpath:json-path:2.4.0") {
exclude group: "org.slf4j", module: "slf4j-api"
}
compile("org.embulk:embulk-util-json:0.1.0") {
// They conflict with embulk-core:0.10.6. They are once excluded here,
// and added explicitly with versions exactly the same with embulk-core:0.10.6.
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"

// It conflicts with embulk-core:0.10.6, and it is expected to stay in embulk-api.
// It is just exluded here.
exclude group: "org.msgpack", module: "msgpack-core"
}
compile("org.embulk:embulk-util-timestamp:0.2.0")

// They are once excluded from transitive dependencies of other dependencies,
// and added explicitly with versions exactly the same with embulk-core:0.10.5.
compile("com.fasterxml.jackson.core:jackson-annotations:2.6.7")
compile("com.fasterxml.jackson.core:jackson-core:2.6.7")
compile("com.fasterxml.jackson.core:jackson-databind:2.6.7")

testImplementation "junit:junit:4.+"
testImplementation "org.embulk:embulk-core:0.9.23:tests"
testImplementation "org.embulk:embulk-core:0.9.23"
testImplementation "org.embulk:embulk-core:0.10.6:tests"
testImplementation "org.embulk:embulk-core:0.10.6"

// TODO: Remove them.
// They are now required because the dependency libraries of them are behind sub ClassLoaders.
// Including them in "testCompile" is a tentative workaround.
testImplementation "org.embulk:embulk-deps-buffer:0.9.23"
testImplementation "org.embulk:embulk-deps-config:0.9.23"
testImplementation "org.embulk:embulk-deps-buffer:0.10.6"
testImplementation "org.embulk:embulk-deps-config:0.10.6"
testImplementation "org.embulk:embulk-deps-timestamp:0.10.6"
}

test {
Expand All @@ -42,6 +64,14 @@ embulkPlugin {
mainClass = "org.embulk.filter.expand_json.ExpandJsonFilterPlugin"
category = "filter"
type = "expand_json"
ignoreConflicts = [
// They conflict with embulk-core:0.10.6, but embulk-core will remove them from its dependencies.
// They are intentionally included in the plugin's dependencies to keep it working before and after the removal.
// Warning messages against them are marked to ignore explicitly here.
[ group: "com.fasterxml.jackson.core", module: "jackson-annotations" ],
[ group: "com.fasterxml.jackson.core", module: "jackson-core" ],
[ group: "com.fasterxml.jackson.core", module: "jackson-databind" ],
]
}

publishing {
Expand Down
6 changes: 6 additions & 0 deletions gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
com.fasterxml.jackson.core:jackson-annotations:2.6.7
com.fasterxml.jackson.core:jackson-core:2.6.7
com.fasterxml.jackson.core:jackson-databind:2.6.7
com.jayway.jsonpath:json-path:2.4.0
net.minidev:accessors-smart:1.2
net.minidev:json-smart:2.3
org.embulk:embulk-util-json:0.1.0
org.embulk:embulk-util-rubytime:0.3.0
org.embulk:embulk-util-timestamp:0.2.0
org.ow2.asm:asm:5.0.4
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.embulk.filter.expand_json;

import com.google.common.collect.ImmutableList;
import com.jayway.jsonpath.JsonPathException;
import com.jayway.jsonpath.spi.cache.Cache;
import com.jayway.jsonpath.spi.cache.CacheProvider;
Expand All @@ -14,26 +13,26 @@
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnConfig;
import org.embulk.spi.Exec;
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.type.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

public class ExpandJsonFilterPlugin
implements FilterPlugin
{
private final Logger logger = Exec.getLogger(ExpandJsonFilterPlugin.class);
private static final Logger logger = LoggerFactory.getLogger(ExpandJsonFilterPlugin.class);

public interface PluginTask
extends Task, TimestampParser.Task
extends Task
{
@Config("json_column_name")
String getJsonColumnName();
Expand All @@ -45,7 +44,19 @@ public interface PluginTask
@Config("expanded_columns")
List<ColumnConfig> getExpandedColumns();

// default_timezone option from TimestampParser.Task
// default_timezone and other options copied from TimestampParser.Task

@Config("default_timezone")
@ConfigDefault("\"UTC\"")
String getDefaultTimeZoneId();

@Config("default_timestamp_format")
@ConfigDefault("\"%Y-%m-%d %H:%M:%S.%N %z\"")
String getDefaultTimestampFormat();

@Config("default_date")
@ConfigDefault("\"1970-01-01\"")
String getDefaultDate();

@Config("stop_on_invalid_record")
@ConfigDefault("false")
Expand Down Expand Up @@ -100,7 +111,7 @@ public PageOutput open(TaskSource taskSource, final Schema inputSchema,

private Schema buildOutputSchema(PluginTask task, Schema inputSchema)
{
ImmutableList.Builder<Column> builder = ImmutableList.builder();
final ArrayList<Column> builder = new ArrayList<>();

int i = 0; // columns index
for (Column inputColumn: inputSchema.getColumns()) {
Expand Down Expand Up @@ -142,7 +153,7 @@ private Schema buildOutputSchema(PluginTask task, Schema inputSchema)
}
}

return new Schema(builder.build());
return new Schema(Collections.unmodifiableList(builder));
}

private void validateExpandedColumns(List<ColumnConfig> expandedColumns)
Expand Down
86 changes: 54 additions & 32 deletions src/main/java/org/embulk/filter/expand_json/FilteredPageOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.ReadContext;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.Task;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnConfig;
Expand All @@ -20,15 +19,20 @@
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.json.JsonParseException;
import org.embulk.spi.json.JsonParser;
import org.embulk.spi.time.TimestampParseException;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.time.Timestamp;
import org.embulk.spi.type.Types;
import org.embulk.util.json.JsonParseException;
import org.embulk.util.json.JsonParser;
import org.embulk.util.timestamp.TimestampFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.embulk.filter.expand_json.ExpandJsonFilterPlugin.PluginTask;

Expand All @@ -40,17 +44,17 @@ private class ExpandedColumn
private final String key;
private final Column column;
private final String jsonPath;
private final Optional<TimestampParser> timestampParser;
private final Optional<TimestampFormatter> timestampFormatter;

ExpandedColumn(String key,
Column column,
String jsonPath,
Optional<TimestampParser> timestampParser)
Optional<TimestampFormatter> timestampFormatter)
{
this.key = key;
this.column = column;
this.jsonPath = jsonPath;
this.timestampParser = timestampParser;
this.timestampFormatter = timestampFormatter;
}

public String getKey()
Expand All @@ -68,9 +72,9 @@ public String getJsonPath()
return jsonPath;
}

public Optional<TimestampParser> getTimestampParser()
public Optional<TimestampFormatter> getTimestampFormatter()
{
return timestampParser;
return timestampFormatter;
}
}

Expand Down Expand Up @@ -103,19 +107,33 @@ public Column getOutputColumn()
}
}

// Copied from org.embulk.spi.time.TimestampParser.TimestampColumnOption for embulk-util-timestamp.
private interface TimestampColumnOption
extends Task, TimestampParser.TimestampColumnOption
{
extends Task {
@Config("timezone")
@ConfigDefault("null")
Optional<String> getTimeZoneId();

@Config("format")
@ConfigDefault("null")
Optional<String> getFormat();

@Config("date")
@ConfigDefault("null")
Optional<String> getDate();
}

private static TimestampParser createTimestampParser(final PluginTask task,
final ColumnConfig columnConfig)
private static TimestampFormatter createTimestampFormatter(final PluginTask task,
final ColumnConfig columnConfig)
{
final TimestampColumnOption columnOption = columnConfig.getOption().loadConfig(TimestampColumnOption.class);
return new TimestampParser(task, columnOption);
return TimestampFormatter.builder(columnOption.getFormat().orElse(task.getDefaultTimestampFormat()), true)
.setDefaultZoneFromString(columnOption.getTimeZoneId().orElse(task.getDefaultTimeZoneId()))
.setDefaultDateFromString(columnOption.getDate().orElse(task.getDefaultDate()))
.build();
}

private final Logger logger = Exec.getLogger(FilteredPageOutput.class);
private static final Logger logger = LoggerFactory.getLogger(FilteredPageOutput.class);
private final boolean stopOnInvalidRecord;
private final boolean keepExpandingJsonColumn;
private final List<UnchangedColumn> unchangedColumns;
Expand All @@ -130,32 +148,32 @@ private static TimestampParser createTimestampParser(final PluginTask task,
private List<ExpandedColumn> initializeExpandedColumns(PluginTask task,
Schema outputSchema)
{
ImmutableList.Builder<ExpandedColumn> expandedJsonColumnsBuilder = ImmutableList.builder();
final ArrayList<ExpandedColumn> expandedJsonColumnsBuilder = new ArrayList<>();
for (Column outputColumn : outputSchema.getColumns()) {
for (ColumnConfig expandedColumnConfig : task.getExpandedColumns()) {
if (outputColumn.getName().equals(expandedColumnConfig.getName())) {

TimestampParser timestampParser = null;
TimestampFormatter timestampFormatter = null;
if (Types.TIMESTAMP.equals(expandedColumnConfig.getType())) {
timestampParser = createTimestampParser(task, expandedColumnConfig);
timestampFormatter = createTimestampFormatter(task, expandedColumnConfig);
}

ExpandedColumn expandedColumn = new ExpandedColumn(outputColumn.getName(),
outputColumn,
task.getRoot() + outputColumn.getName(),
Optional.fromNullable(timestampParser));
Optional.ofNullable(timestampFormatter));
expandedJsonColumnsBuilder.add(expandedColumn);
}
}
}
return expandedJsonColumnsBuilder.build();
return Collections.unmodifiableList(expandedJsonColumnsBuilder);
}

private List<UnchangedColumn> initializeUnchangedColumns(Schema inputSchema,
Schema outputSchema,
Column excludeColumn)
{
ImmutableList.Builder<UnchangedColumn> unchangedColumnsBuilder = ImmutableList.builder();
final ArrayList<UnchangedColumn> unchangedColumnsBuilder = new ArrayList<>();
for (Column outputColumn : outputSchema.getColumns()) {
for (Column inputColumn : inputSchema.getColumns()) {
if (inputColumn.getName().equals(outputColumn.getName()) &&
Expand All @@ -168,7 +186,7 @@ private List<UnchangedColumn> initializeUnchangedColumns(Schema inputSchema,
}
}
}
return unchangedColumnsBuilder.build();
return Collections.unmodifiableList(unchangedColumnsBuilder);
}

private Column initializeJsonColumn(PluginTask task, Schema inputSchema)
Expand Down Expand Up @@ -238,7 +256,7 @@ public void close()
pageBuilder.close();
}


private void setUnchangedColumns() {
for (UnchangedColumn unchangedColumn : unchangedColumns) {
Column inputColumn = unchangedColumn.getInputColumn();
Expand Down Expand Up @@ -286,7 +304,11 @@ private void setExpandedJsonColumns()
jsonObject = pageReader.getString(jsonColumn);
}

json = Strings.isNullOrEmpty(jsonObject) ? null : parseContext.parse(jsonObject);
if (jsonObject == null || jsonObject.isEmpty()) {
json = null;
} else {
json = parseContext.parse(jsonObject);
}
}

for (ExpandedColumn expandedJsonColumn: expandedColumns) {
Expand Down Expand Up @@ -331,17 +353,17 @@ else if (Types.LONG.equals(expandedJsonColumn.getColumn().getType())) {
}
}
else if (Types.TIMESTAMP.equals(expandedJsonColumn.getColumn().getType())) {
if (expandedJsonColumn.getTimestampParser().isPresent()) {
TimestampParser parser = expandedJsonColumn.getTimestampParser().get();
if (expandedJsonColumn.getTimestampFormatter().isPresent()) {
TimestampFormatter formatter = expandedJsonColumn.getTimestampFormatter().get();
try {
pageBuilder.setTimestamp(expandedJsonColumn.getColumn(), parser.parse(finalValue));
pageBuilder.setTimestamp(expandedJsonColumn.getColumn(), Timestamp.ofInstant(formatter.parse(finalValue)));
}
catch (TimestampParseException e) {
catch (DateTimeParseException e) {
throw new JsonValueInvalidException(String.format("Failed to parse '%s' as timestamp", finalValue), e);
}
}
else {
throw new RuntimeException("TimestampParser is absent for column:" + expandedJsonColumn.getKey());
throw new RuntimeException("TimestampFormatter is absent for column:" + expandedJsonColumn.getKey());
}
}
else if (Types.JSON.equals(expandedJsonColumn.getColumn().getType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testDefaultValue()
PluginTask task = config.loadConfig(PluginTask.class);

assertEquals("$.", task.getRoot());
assertEquals("UTC", task.getDefaultTimeZone().getID());
assertEquals("UTC", task.getDefaultTimeZoneId());
assertEquals("%Y-%m-%d %H:%M:%S.%N %z", task.getDefaultTimestampFormat());
assertEquals(false, task.getStopOnInvalidRecord());
assertEquals(false, task.getKeepExpandingJsonColumn());
Expand Down

0 comments on commit 833e148

Please # to comment.