Skip to content

Commit

Permalink
Use embulk-util-timestamp:0.2.0 instead of org.embulk.spi.time.Timest…
Browse files Browse the repository at this point in the history
…ampParser
  • Loading branch information
dmikurube authored and civitaspo committed Aug 13, 2020
1 parent 603bc9a commit dfea900
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 24 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
// It is just exluded here.
exclude group: "org.msgpack", module: "msgpack-core"
}
compile("org.embulk:embulk-util-timestamp:0.2.0")

// They are once excluded from transitive dependencies of other dependencies,
// and added explicitly with versions exactly the same with embulk-core:0.10.5.
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ com.jayway.jsonpath:json-path:2.4.0
net.minidev:accessors-smart:1.2
net.minidev:json-smart:2.3
org.embulk:embulk-util-json:0.1.0
org.embulk:embulk-util-rubytime:0.3.0
org.embulk:embulk-util-timestamp:0.2.0
org.ow2.asm:asm:5.0.4
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.embulk.spi.FilterPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.type.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,7 +32,7 @@ public class ExpandJsonFilterPlugin
private static final Logger logger = LoggerFactory.getLogger(ExpandJsonFilterPlugin.class);

public interface PluginTask
extends Task, TimestampParser.Task
extends Task
{
@Config("json_column_name")
String getJsonColumnName();
Expand All @@ -45,7 +44,19 @@ public interface PluginTask
@Config("expanded_columns")
List<ColumnConfig> getExpandedColumns();

// default_timezone option from TimestampParser.Task
// default_timezone and other options copied from TimestampParser.Task

@Config("default_timezone")
@ConfigDefault("\"UTC\"")
String getDefaultTimeZoneId();

@Config("default_timestamp_format")
@ConfigDefault("\"%Y-%m-%d %H:%M:%S.%N %z\"")
String getDefaultTimestampFormat();

@Config("default_date")
@ConfigDefault("\"1970-01-01\"")
String getDefaultDate();

@Config("stop_on_invalid_record")
@ConfigDefault("false")
Expand Down
57 changes: 37 additions & 20 deletions src/main/java/org/embulk/filter/expand_json/FilteredPageOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.ReadContext;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.Task;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnConfig;
Expand All @@ -17,14 +19,15 @@
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.time.TimestampParseException;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.time.Timestamp;
import org.embulk.spi.type.Types;
import org.embulk.util.json.JsonParseException;
import org.embulk.util.json.JsonParser;
import org.embulk.util.timestamp.TimestampFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -41,17 +44,17 @@ private class ExpandedColumn
private final String key;
private final Column column;
private final String jsonPath;
private final Optional<TimestampParser> timestampParser;
private final Optional<TimestampFormatter> timestampFormatter;

ExpandedColumn(String key,
Column column,
String jsonPath,
Optional<TimestampParser> timestampParser)
Optional<TimestampFormatter> timestampFormatter)
{
this.key = key;
this.column = column;
this.jsonPath = jsonPath;
this.timestampParser = timestampParser;
this.timestampFormatter = timestampFormatter;
}

public String getKey()
Expand All @@ -69,9 +72,9 @@ public String getJsonPath()
return jsonPath;
}

public Optional<TimestampParser> getTimestampParser()
public Optional<TimestampFormatter> getTimestampFormatter()
{
return timestampParser;
return timestampFormatter;
}
}

Expand Down Expand Up @@ -104,16 +107,30 @@ public Column getOutputColumn()
}
}

// Copied from org.embulk.spi.time.TimestampParser.TimestampColumnOption for embulk-util-timestamp.
private interface TimestampColumnOption
extends Task, TimestampParser.TimestampColumnOption
{
extends Task {
@Config("timezone")
@ConfigDefault("null")
Optional<String> getTimeZoneId();

@Config("format")
@ConfigDefault("null")
Optional<String> getFormat();

@Config("date")
@ConfigDefault("null")
Optional<String> getDate();
}

private static TimestampParser createTimestampParser(final PluginTask task,
final ColumnConfig columnConfig)
private static TimestampFormatter createTimestampFormatter(final PluginTask task,
final ColumnConfig columnConfig)
{
final TimestampColumnOption columnOption = columnConfig.getOption().loadConfig(TimestampColumnOption.class);
return new TimestampParser(task, columnOption);
return TimestampFormatter.builder(columnOption.getFormat().orElse(task.getDefaultTimestampFormat()), true)
.setDefaultZoneFromString(columnOption.getTimeZoneId().orElse(task.getDefaultTimeZoneId()))
.setDefaultDateFromString(columnOption.getDate().orElse(task.getDefaultDate()))
.build();
}

private static final Logger logger = LoggerFactory.getLogger(FilteredPageOutput.class);
Expand All @@ -136,15 +153,15 @@ private List<ExpandedColumn> initializeExpandedColumns(PluginTask task,
for (ColumnConfig expandedColumnConfig : task.getExpandedColumns()) {
if (outputColumn.getName().equals(expandedColumnConfig.getName())) {

TimestampParser timestampParser = null;
TimestampFormatter timestampFormatter = null;
if (Types.TIMESTAMP.equals(expandedColumnConfig.getType())) {
timestampParser = createTimestampParser(task, expandedColumnConfig);
timestampFormatter = createTimestampFormatter(task, expandedColumnConfig);
}

ExpandedColumn expandedColumn = new ExpandedColumn(outputColumn.getName(),
outputColumn,
task.getRoot() + outputColumn.getName(),
Optional.ofNullable(timestampParser));
Optional.ofNullable(timestampFormatter));
expandedJsonColumnsBuilder.add(expandedColumn);
}
}
Expand Down Expand Up @@ -336,17 +353,17 @@ else if (Types.LONG.equals(expandedJsonColumn.getColumn().getType())) {
}
}
else if (Types.TIMESTAMP.equals(expandedJsonColumn.getColumn().getType())) {
if (expandedJsonColumn.getTimestampParser().isPresent()) {
TimestampParser parser = expandedJsonColumn.getTimestampParser().get();
if (expandedJsonColumn.getTimestampFormatter().isPresent()) {
TimestampFormatter formatter = expandedJsonColumn.getTimestampFormatter().get();
try {
pageBuilder.setTimestamp(expandedJsonColumn.getColumn(), parser.parse(finalValue));
pageBuilder.setTimestamp(expandedJsonColumn.getColumn(), Timestamp.ofInstant(formatter.parse(finalValue)));
}
catch (TimestampParseException e) {
catch (DateTimeParseException e) {
throw new JsonValueInvalidException(String.format("Failed to parse '%s' as timestamp", finalValue), e);
}
}
else {
throw new RuntimeException("TimestampParser is absent for column:" + expandedJsonColumn.getKey());
throw new RuntimeException("TimestampFormatter is absent for column:" + expandedJsonColumn.getKey());
}
}
else if (Types.JSON.equals(expandedJsonColumn.getColumn().getType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testDefaultValue()
PluginTask task = config.loadConfig(PluginTask.class);

assertEquals("$.", task.getRoot());
assertEquals("UTC", task.getDefaultTimeZone().getID());
assertEquals("UTC", task.getDefaultTimeZoneId());
assertEquals("%Y-%m-%d %H:%M:%S.%N %z", task.getDefaultTimestampFormat());
assertEquals(false, task.getStopOnInvalidRecord());
assertEquals(false, task.getKeepExpandingJsonColumn());
Expand Down

0 comments on commit dfea900

Please # to comment.