Skip to content

Commit

Permalink
Merge pull request #23 from sakama/improve-exception-handling
Browse files Browse the repository at this point in the history
[Bugfix] Improve Exception handling
  • Loading branch information
civitaspo authored Jul 27, 2016
2 parents 1f19d6b + a8095b9 commit 39a7c2e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.embulk.spi.PageOutput;
import org.embulk.spi.PageReader;
import org.embulk.spi.Schema;
import org.embulk.spi.json.JsonParseException;
import org.embulk.spi.json.JsonParser;
import org.embulk.spi.time.TimestampParseException;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.type.Types;
import org.joda.time.DateTimeZone;
Expand Down Expand Up @@ -308,22 +310,42 @@ else if (Types.BOOLEAN.equals(expandedJsonColumn.getColumn().getType())) {
pageBuilder.setBoolean(expandedJsonColumn.getColumn(), Boolean.parseBoolean(finalValue));
}
else if (Types.DOUBLE.equals(expandedJsonColumn.getColumn().getType())) {
pageBuilder.setDouble(expandedJsonColumn.getColumn(), Double.parseDouble(finalValue));
try {
pageBuilder.setDouble(expandedJsonColumn.getColumn(), Double.parseDouble(finalValue));
}
catch (NumberFormatException e) {
throw new JsonValueInvalidException(String.format("Failed to parse '%s' as double", finalValue), e);
}
}
else if (Types.LONG.equals(expandedJsonColumn.getColumn().getType())) {
pageBuilder.setLong(expandedJsonColumn.getColumn(), Long.parseLong(finalValue));
try {
pageBuilder.setLong(expandedJsonColumn.getColumn(), Long.parseLong(finalValue));
}
catch (NumberFormatException e) {
throw new JsonValueInvalidException(String.format("Failed to parse '%s' as long", finalValue), e);
}
}
else if (Types.TIMESTAMP.equals(expandedJsonColumn.getColumn().getType())) {
if (expandedJsonColumn.getTimestampParser().isPresent()) {
TimestampParser parser = expandedJsonColumn.getTimestampParser().get();
pageBuilder.setTimestamp(expandedJsonColumn.getColumn(), parser.parse(finalValue));
try {
pageBuilder.setTimestamp(expandedJsonColumn.getColumn(), parser.parse(finalValue));
}
catch (TimestampParseException e) {
throw new JsonValueInvalidException(String.format("Failed to parse '%s' as timestamp", finalValue), e);
}
}
else {
throw new RuntimeException("TimestampParser is absent for column:" + expandedJsonColumn.getKey());
}
}
else if (Types.JSON.equals(expandedJsonColumn.getColumn().getType())) {
pageBuilder.setJson(expandedJsonColumn.getColumn(), jsonParser.parse(finalValue));
try {
pageBuilder.setJson(expandedJsonColumn.getColumn(), jsonParser.parse(finalValue));
}
catch (JsonParseException e) {
throw new JsonValueInvalidException(String.format("Failed to parse '%s' as JSON", finalValue), e);
}
}
}
}
Expand All @@ -347,4 +369,12 @@ else if (value instanceof String) {
return String.valueOf(value);
}
}

private class JsonValueInvalidException extends DataException
{
JsonValueInvalidException(String message, Throwable cause)
{
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,65 @@ public void run(TaskSource taskSource, Schema outputSchema)
});
}

@Test(expected = DataException.class)
public void testSetExpandedJsonColumnsSetInvalidDoubleValue()
{
setExpandedJsonColumnsWithInvalidValue("double", s("abcde"));
}

@Test(expected = DataException.class)
public void testSetExpandedJsonColumnsSetInvalidLongValue()
{
setExpandedJsonColumnsWithInvalidValue("long", s("abcde"));
}

@Test(expected = DataException.class)
public void testSetExpandedJsonColumnsSetInvalidTimestampValue()
{
setExpandedJsonColumnsWithInvalidValue("timestamp", s("abcde"));
}

@Test(expected = DataException.class)
public void testSetExpandedJsonColumnsSetInvalidJsonValue()
{
setExpandedJsonColumnsWithInvalidValue("json", s("abcde"));
}

public void setExpandedJsonColumnsWithInvalidValue(String ValidType, final Value invalidValue)
{
String configYaml = "" +
"type: expand_json\n" +
"stop_on_invalid_record: 1\n" +
"json_column_name: _c0\n" +
"root: $.\n" +
"time_zone: Asia/Tokyo\n" +
"expanded_columns:\n" +
" - {name: _j0, type: " + ValidType + "}\n";

ConfigSource config = getConfigFromYaml(configYaml);
final Schema schema = schema("_c0", JSON, "_c1", STRING);

expandJsonFilterPlugin.transaction(config, schema, new Control()
{
@Override
public void run(TaskSource taskSource, Schema outputSchema)
{
MockPageOutput mockPageOutput = new MockPageOutput();
Value data = newMapBuilder()
.put(s("_j0"), invalidValue)
.build();

try (PageOutput pageOutput = expandJsonFilterPlugin.open(taskSource, schema, outputSchema, mockPageOutput)) {
for (Page page : PageTestUtils.buildPage(runtime.getBufferAllocator(), schema, data, c1Data)) {
pageOutput.add(page);
}

pageOutput.finish();
}
}
});
}

@Test
public void testExpandedJsonValuesWithKeepJsonColumns()
{
Expand Down

0 comments on commit 39a7c2e

Please # to comment.