Skip to content

Commit

Permalink
[WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
dmikurube committed Jul 25, 2023
1 parent d585ba1 commit 9c20304
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 33 deletions.
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ configurations {
runtimeClasspath.resolutionStrategy.activateDependencyLocking()
}

sourceSets {
main {
java {
srcDir "../embulk-util-json/src/main/"
}
}
}

tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:deprecation" << "-Xlint:unchecked"
options.encoding = "UTF-8"
Expand All @@ -43,7 +51,7 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:2.6.7.5"
implementation "org.embulk:embulk-util-config:0.3.4"
implementation "org.embulk:embulk-util-file:0.1.5"
implementation "org.embulk:embulk-util-json:0.2.2"
// implementation "org.embulk:embulk-util-json:0.2.2"
implementation "org.embulk:embulk-util-timestamp:0.2.2"

testImplementation "junit:junit:4.13.2"
Expand Down
1 change: 0 additions & 1 deletion gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ javax.validation:validation-api:1.1.0.Final=compileClasspath,runtimeClasspath
org.embulk:embulk-spi:0.11=compileClasspath
org.embulk:embulk-util-config:0.3.4=compileClasspath,runtimeClasspath
org.embulk:embulk-util-file:0.1.5=compileClasspath,runtimeClasspath
org.embulk:embulk-util-json:0.2.2=compileClasspath,runtimeClasspath
org.embulk:embulk-util-rubytime:0.3.3=compileClasspath,runtimeClasspath
org.embulk:embulk-util-timestamp:0.2.2=compileClasspath,runtimeClasspath
org.msgpack:msgpack-core:0.8.24=compileClasspath
Expand Down
82 changes: 51 additions & 31 deletions src/main/java/org/embulk/parser/json/JsonParserPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.embulk.parser.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonPointer;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -27,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand All @@ -42,6 +45,7 @@
import org.embulk.spi.PageOutput;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.Schema;
import org.embulk.spi.json.JsonValue;
import org.embulk.spi.type.TimestampType;
import org.embulk.spi.type.Types;
import org.embulk.util.config.Config;
Expand All @@ -51,8 +55,9 @@
import org.embulk.util.config.units.ColumnConfig;
import org.embulk.util.config.units.SchemaConfig;
import org.embulk.util.file.FileInputInputStream;
import org.embulk.util.json.CapturingPointers;
import org.embulk.util.json.JsonParseException;
import org.embulk.util.json.JsonParser;
import org.embulk.util.json.JsonValueParser;
import org.embulk.util.timestamp.TimestampFormatter;
import org.msgpack.core.Preconditions;
import org.msgpack.value.MapValue;
Expand All @@ -62,10 +67,6 @@
import org.slf4j.LoggerFactory;

public class JsonParserPlugin implements ParserPlugin {
public JsonParserPlugin() {
this.jsonParser = new JsonParser();
}

public enum InvalidEscapeStringPolicy {
PASSTHROUGH("PASSTHROUGH"),
SKIP("SKIP"),
Expand Down Expand Up @@ -161,19 +162,34 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu

final boolean stopOnInvalidRecord = task.getStopOnInvalidRecord();
final Map<Column, TimestampFormatter> timestampFormatters = new HashMap<>();
final Map<Column, String> jsonPointers = new HashMap<>();
final CapturingPointers capturingPointers = buildCapturingPointers(task.getSchemaConfig());
if (isUsingCustomSchema(task)) {
final SchemaConfig schemaConfig = task.getSchemaConfig().get();
timestampFormatters.putAll(newTimestampColumnFormattersAsMap(task, task.getSchemaConfig().get()));
jsonPointers.putAll(createJsonPointerMap(schema, schemaConfig));
}

try (final PageBuilder pageBuilder = Exec.getPageBuilder(Exec.getBufferAllocator(), schema, output);
FileInputInputStream in = new FileInputInputStream(input)) {
while (in.nextFile()) {
final String fileName = input.hintOfCurrentInputFileNameForLogging().orElse("-");
final JsonFactory jsonFactory = new JsonFactory();
jsonFactory.enable(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS);
jsonFactory.enable(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS);
final JsonValueParser.Builder parserBuilder = JsonValueParser.builder(jsonFactory);
if (task.getRoot().isPresent()) {
parserBuilder.root(JsonPointer.compile(task.getRoot().get()));
}

try (final PageBuilder pageBuilder = Exec.getPageBuilder(Exec.getBufferAllocator(), schema, output)) {
final String fileName = input.hintOfCurrentInputFileNameForLogging().orElse("-");
try (final FileInputInputStream in = new FileInputInputStream(input)) {
boolean evenOneJsonParsed = false;
while (in.nextFile()) {
try (final JsonValueParser parser = newParser(in, task, parserBuilder)) {
final JsonValue[] values = parser.captureJsonValues(capturingPointers);
}
}
}
pageBuilder.finish();
}

/*
try (JsonParser.Stream stream = newJsonStream(in, task)) {
Value originalValue;
while ((originalValue = stream.next()) != null) {
Expand Down Expand Up @@ -221,11 +237,11 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
throw new DataException(String.format("Failed to parse JSON: %s", fileName), e);
}
}

pageBuilder.finish();
}
*/
}

/*
private void addRecord(
PluginTask task,
PageBuilder pageBuilder,
Expand All @@ -250,11 +266,13 @@ private void addRecord(
}
pageBuilder.addRecord();
}
*/

private static boolean isUsingCustomSchema(PluginTask task) {
return task.getSchemaConfig().isPresent() && !task.getSchemaConfig().get().isEmpty();
}

/*
private static void setValueWithSingleJsonColumn(PageBuilder pageBuilder, Schema schema, MapValue value) {
final Column column = schema.getColumn(0); // record column
pageBuilder.setJson(column, value);
Expand Down Expand Up @@ -337,6 +355,7 @@ public void jsonColumn(Column column) {
});
}
}
*/

private Value parseColumnValueWithOffsetInJsonPointer(String valueAsJsonString, String jsonPointer) {
try {
Expand All @@ -352,31 +371,27 @@ private Value parseColumnValueWithOffsetInJsonPointer(String valueAsJsonString,
}
}

private JsonParser.Stream newJsonStream(FileInputInputStream in, PluginTask task)
private JsonValueParser newParser(final FileInputInputStream in, final PluginTask task, final JsonValueParser.Builder builder)
throws IOException {
final InvalidEscapeStringPolicy policy = task.getInvalidEscapeStringPolicy();
final InputStream inputStream;
switch (policy) {
case SKIP:
case UNESCAPE:
byte[] lines = new BufferedReader(new InputStreamReader(in))
final byte[] lines = new BufferedReader(new InputStreamReader(in))
.lines()
.map(invalidEscapeStringFunction(policy))
.collect(Collectors.joining())
.getBytes(StandardCharsets.UTF_8);
inputStream = new ByteArrayInputStream(lines);
break;
return builder.build(new ByteArrayInputStream(lines));
case PASSTHROUGH:
default:
inputStream = in;
return builder.build(in);
}

return jsonParser.open(inputStream);
}

static Function<String, String> invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy) {
return input -> {
Preconditions.checkNotNull(input);
Objects.requireNonNull(input);
if (policy == InvalidEscapeStringPolicy.PASSTHROUGH) {
return input;
}
Expand Down Expand Up @@ -433,19 +448,26 @@ static Function<String, String> invalidEscapeStringFunction(final InvalidEscapeS
};
}

private static Map<Column, String> createJsonPointerMap(Schema schema, SchemaConfig config) {
Map<Column, String> result = new HashMap<>();
final List<Column> columns = schema.getColumns();
private static CapturingPointers buildCapturingPointers(final Optional<SchemaConfig> schemaConfigOptional) {
final CapturingPointers.Builder builder = CapturingPointers.builder();
if (!schemaConfigOptional.isPresent() || schemaConfigOptional.get().isEmpty()) {
return builder.build();
}

final SchemaConfig schemaConfig = schemaConfigOptional.get();
final List<ColumnConfig> columns = schemaConfig.getColumns();
for (int i = 0; i < columns.size(); i++) {
final Column column = columns.get(i);
final ColumnConfig columnConfig = config.getColumn(i);
final ColumnConfig columnConfig = columns.get(i);
final OptionalColumnConfig options =
CONFIG_MAPPER_FACTORY.createConfigMapper().map(columnConfig.getOption(), OptionalColumnConfig.class);
if (options.getElementAt().isPresent()) {
result.put(column, options.getElementAt().get());
builder.addJsonPointer(options.getElementAt().get());
} else {
builder.addDirectMemberName(columnConfig.getName());
}
}
return result;

return builder.build();
}

private static Map<Column, TimestampFormatter> newTimestampColumnFormattersAsMap(
Expand Down Expand Up @@ -480,6 +502,4 @@ static class JsonRecordValidateException extends DataException {
private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build();

private static final Pattern DIGITS_PATTERN = Pattern.compile("\\p{XDigit}+");

private final JsonParser jsonParser;
}

0 comments on commit 9c20304

Please sign in to comment.