Skip to content

Commit c86a1a3

Browse files
committed
enable schema inference and csv rewrites for export-pg-from-queries
1 parent 7050e96 commit c86a1a3

File tree

12 files changed

+111
-67
lines changed

12 files changed

+111
-67
lines changed

Diff for: src/main/java/com/amazonaws/services/neptune/ExportPropertyGraphFromGremlinQueries.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void run() {
123123
CsvPrinterOptions csvPrinterOptions = CsvPrinterOptions.builder().setIncludeTypeDefinitions(includeTypeDefinitions).build();
124124
JsonPrinterOptions jsonPrinterOptions = JsonPrinterOptions.builder().setStrictCardinality(true).build();
125125

126-
PropertyGraphTargetConfig targetConfig = target.config(directories, new PrinterOptions(csvPrinterOptions, jsonPrinterOptions), structuredOutput);
126+
PropertyGraphTargetConfig targetConfig = target.config(directories, new PrinterOptions(csvPrinterOptions, jsonPrinterOptions));
127127
NamedQueriesCollection namedQueries = getNamedQueriesCollection(queries, queriesFile, queriesResource);
128128

129129
GraphSchema graphSchema = new GraphSchema();
@@ -168,8 +168,10 @@ public void run() {
168168
directories.writeResultsDirectoryPathAsMessage(target.description(), target);
169169

170170
queriesResource.writeResourcePathAsMessage(target);
171-
configFileResource.save(graphSchema, false);
172-
statsFileResource.save(exportStats, graphSchema);
171+
if (structuredOutput) {
172+
configFileResource.save(graphSchema, false);
173+
statsFileResource.save(exportStats, graphSchema);
174+
}
173175

174176
directories.writeRootDirectoryPathAsReturnValue(target);
175177
onExportComplete(directories, exportStats, cluster, graphSchema);

Diff for: src/main/java/com/amazonaws/services/neptune/cli/PropertyGraphTargetModule.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,14 @@ public PropertyGraphTargetModule(Target target) {
4242
}
4343

4444
public PropertyGraphTargetConfig config(Directories directories, PrinterOptions printerOptions){
45-
return config(directories, printerOptions, true);
46-
}
47-
48-
public PropertyGraphTargetConfig config(Directories directories, PrinterOptions printerOptions, boolean inferSchema){
4945

5046
if (mergeFiles && (format != PropertyGraphExportFormat.csv && format != PropertyGraphExportFormat.csvNoHeaders)){
5147
throw new IllegalArgumentException("Merge files is only supported for CSV formats for export-pg");
5248
}
5349

5450
KinesisConfig kinesisConfig = new KinesisConfig(this);
5551

56-
return new PropertyGraphTargetConfig(directories, kinesisConfig, printerOptions, format, getOutput(), mergeFiles, perLabelDirectories, inferSchema);
52+
return new PropertyGraphTargetConfig(directories, kinesisConfig, printerOptions, format, getOutput(), mergeFiles, perLabelDirectories, true);
5753
}
5854

5955
public String description(){

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/io/QueryJob.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionExce
8989

9090
Collection<FileSpecificLabelSchemas> nodesFileSpecificLabelSchemas = new ArrayList<>();
9191
Collection<FileSpecificLabelSchemas> edgesFileSpecificLabelSchemas = new ArrayList<>();
92+
Collection<FileSpecificLabelSchemas> queryResultsFileSpecificLabelSchemas = new ArrayList<>();
9293

9394
LabelsFilter nodeLabelFilter = new AllLabels(NodeLabelStrategy.nodeLabelsOnly);
9495
LabelsFilter edgeLabelFilter = new AllLabels(EdgeLabelStrategy.edgeLabelsOnly);
@@ -97,7 +98,7 @@ private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionExce
9798
if (exportSpecification.getGraphElementType() == GraphElementType.nodes) {
9899
nodeLabelFilter = exportSpecification.getLabelsFilter();
99100
}
100-
else {
101+
else if (exportSpecification.getGraphElementType() == GraphElementType.edges) {
101102
edgeLabelFilter = exportSpecification.getLabelsFilter();
102103
}
103104
}
@@ -139,18 +140,28 @@ private Map<GraphElementType, GraphElementSchemas> export() throws ExecutionExce
139140
Map<GraphElementType, FileSpecificLabelSchemas> result = future.get();
140141
nodesFileSpecificLabelSchemas.add(result.get(GraphElementType.nodes));
141142
edgesFileSpecificLabelSchemas.add(result.get(GraphElementType.edges));
143+
queryResultsFileSpecificLabelSchemas.add(result.get(GraphElementType.queryResults));
142144
}
143145

144146
RewriteCommand rewriteCommand = targetConfig.createRewriteCommand(concurrencyConfig, featureToggles);
145147
Map<GraphElementType, GraphElementSchemas> graphElementSchemas = new HashMap<>();
146148

147-
for(ExportSpecification exportSpecification : exportSpecifications) {
148-
MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(
149-
exportSpecification.getGraphElementType().equals(GraphElementType.nodes) ?
150-
nodesFileSpecificLabelSchemas : edgesFileSpecificLabelSchemas
151-
);
149+
if (structuredOutput) {
150+
for(ExportSpecification exportSpecification : exportSpecifications) {
151+
MasterLabelSchemas masterLabelSchemas = exportSpecification.createMasterLabelSchemas(
152+
exportSpecification.getGraphElementType().equals(GraphElementType.nodes) ?
153+
nodesFileSpecificLabelSchemas : edgesFileSpecificLabelSchemas
154+
);
155+
try {
156+
graphElementSchemas.put(exportSpecification.getGraphElementType(), rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
157+
} catch (Exception e) {
158+
throw new RuntimeException(e);
159+
}
160+
}
161+
} else {
162+
MasterLabelSchemas masterLabelSchemas = new MasterLabelSchemas(queryResultsFileSpecificLabelSchemas, GraphElementType.queryResults);
152163
try {
153-
graphElementSchemas.put(exportSpecification.getGraphElementType(), rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
164+
graphElementSchemas.put(GraphElementType.queryResults, rewriteCommand.execute(masterLabelSchemas).toGraphElementSchemas());
154165
} catch (Exception e) {
155166
throw new RuntimeException(e);
156167
}

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/io/QueryTask.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public Map<GraphElementType, FileSpecificLabelSchemas> call() throws Exception {
9191
Map<GraphElementType, FileSpecificLabelSchemas> fileSpecificLabelSchemasMap = new HashMap<>();
9292
fileSpecificLabelSchemasMap.put(GraphElementType.nodes, new FileSpecificLabelSchemas());
9393
fileSpecificLabelSchemasMap.put(GraphElementType.edges, new FileSpecificLabelSchemas());
94+
fileSpecificLabelSchemasMap.put(GraphElementType.queryResults, new FileSpecificLabelSchemas());
9495

9596
try {
9697

@@ -186,6 +187,7 @@ private void executeQuery(NamedQuery namedQuery,
186187
}
187188
else {
188189
ResultsHandler resultsHandler = new ResultsHandler(
190+
fileSpecificLabelSchemasMap.get(GraphElementType.queryResults),
189191
new Label(namedQuery.name()),
190192
labelWriters,
191193
writerFactory,
@@ -219,8 +221,10 @@ private class ResultsHandler implements GraphElementHandler<Map<?, ?>> {
219221
private final Map<Label, LabelWriter<Map<?, ?>>> labelWriters;
220222
private final QueriesWriterFactory writerFactory;
221223
private final GraphElementSchemas graphElementSchemas;
224+
private final FileSpecificLabelSchemas fileSpecificLabelSchemas;
222225

223-
private ResultsHandler(Label label,
226+
private ResultsHandler(FileSpecificLabelSchemas fileSpecificLabelSchemas,
227+
Label label,
224228
Map<Label, LabelWriter<Map<?, ?>>> labelWriters,
225229
QueriesWriterFactory writerFactory,
226230
GraphElementSchemas graphElementSchemas) {
@@ -229,6 +233,7 @@ private ResultsHandler(Label label,
229233
this.writerFactory = writerFactory;
230234

231235
this.graphElementSchemas = graphElementSchemas;
236+
this.fileSpecificLabelSchemas = fileSpecificLabelSchemas;
232237
}
233238

234239
private void createWriter(Map<?, ?> properties, boolean allowStructuralElements) {
@@ -242,7 +247,10 @@ private void createWriter(Map<?, ?> properties, boolean allowStructuralElements)
242247
PropertyGraphPrinter propertyGraphPrinter =
243248
writerFactory.createPrinter(Directories.fileName(label.fullyQualifiedLabel(), index), labelSchema, targetConfig);
244249

245-
labelWriters.put(label, writerFactory.createLabelWriter(propertyGraphPrinter, label));
250+
LabelWriter<Map<?, ?>> labelWriter = writerFactory.createLabelWriter(propertyGraphPrinter, label);
251+
252+
labelWriters.put(label, labelWriter);
253+
fileSpecificLabelSchemas.add(labelWriter.outputId(), targetConfig.format(), labelSchema);
246254

247255
} catch (IOException e) {
248256
throw new RuntimeException(e);

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/io/RewriteAndMergeCsv.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private MasterLabelSchema rewriteAndMerge(PropertyGraphTargetConfig targetConfig
157157

158158
if (graphElementType == GraphElementType.nodes) {
159159
printer.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
160-
} else {
160+
} else if (graphElementType == GraphElementType.edges) {
161161
if (label.hasFromAndToLabels()) {
162162
printer.printEdge(
163163
record.get("~id"),

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/io/RewriteCsv.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private MasterLabelSchema rewrite(PropertyGraphTargetConfig targetConfig,
158158

159159
if (graphElementType == GraphElementType.nodes) {
160160
target.printNode(record.get("~id"), Arrays.asList(record.get("~label").split(";")));
161-
} else {
161+
} else if (graphElementType == GraphElementType.edges) {
162162
if (label.hasFromAndToLabels()) {
163163
target.printEdge(
164164
record.get("~id"),

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/schema/ExportSpecification.java

+1-34
Original file line numberDiff line numberDiff line change
@@ -121,40 +121,7 @@ public ExportPropertyGraphTask createExportTask(GraphSchema graphSchema,
121121
}
122122

123123
public MasterLabelSchemas createMasterLabelSchemas(Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemasCollection) {
124-
125-
Set<Label> labels = new HashSet<>();
126-
127-
fileSpecificLabelSchemasCollection.forEach(s -> labels.addAll(s.labels()));
128-
129-
Map<Label, MasterLabelSchema> masterLabelSchemas = new HashMap<>();
130-
131-
for (Label label : labels) {
132-
133-
LabelSchema masterLabelSchema = new LabelSchema(label);
134-
Collection<FileSpecificLabelSchema> fileSpecificLabelSchemas = new ArrayList<>();
135-
136-
for (FileSpecificLabelSchemas fileSpecificLabelSchemasForTask : fileSpecificLabelSchemasCollection) {
137-
if (fileSpecificLabelSchemasForTask.hasSchemasForLabel(label)) {
138-
Set<LabelSchema> labelSchemaSet = new HashSet<>();
139-
for (FileSpecificLabelSchema fileSpecificLabelSchema :
140-
fileSpecificLabelSchemasForTask.fileSpecificLabelSchemasFor(label)) {
141-
fileSpecificLabelSchemas.add(fileSpecificLabelSchema);
142-
labelSchemaSet.add(fileSpecificLabelSchema.labelSchema());
143-
}
144-
for (LabelSchema labelSchema : labelSchemaSet) {
145-
masterLabelSchema = masterLabelSchema.union(labelSchema);
146-
}
147-
}
148-
}
149-
150-
masterLabelSchemas.put(
151-
label,
152-
new MasterLabelSchema(masterLabelSchema, fileSpecificLabelSchemas));
153-
154-
155-
}
156-
157-
return new MasterLabelSchemas(masterLabelSchemas, graphElementType);
124+
return new MasterLabelSchemas(fileSpecificLabelSchemasCollection, graphElementType);
158125
}
159126

160127
public Collection<ExportSpecification> splitByLabel() {

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/schema/GraphElementType.java

+19
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@
1919
import com.amazonaws.services.neptune.propertygraph.NodesClient;
2020
import com.amazonaws.services.neptune.propertygraph.io.EdgesWriterFactory;
2121
import com.amazonaws.services.neptune.propertygraph.io.NodesWriterFactory;
22+
import com.amazonaws.services.neptune.propertygraph.io.QueriesWriterFactory;
2223
import com.amazonaws.services.neptune.propertygraph.io.WriterFactory;
2324
import com.amazonaws.services.neptune.propertygraph.io.result.PGResult;
25+
import com.amazonaws.services.neptune.util.NotImplementedException;
2426
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
2527

2628
import java.util.Arrays;
2729
import java.util.Collection;
30+
import java.util.Collections;
2831

2932
public enum GraphElementType {
3033

@@ -59,6 +62,22 @@ public GraphClient<? extends PGResult> graphClient(GraphTraversalSource g, boole
5962
public WriterFactory<? extends PGResult> writerFactory() {
6063
return new EdgesWriterFactory();
6164
}
65+
},
66+
queryResults {
67+
@Override
68+
public Collection<String> tokenNames() {
69+
return Collections.emptyList();
70+
}
71+
72+
@Override
73+
public GraphClient<? extends PGResult> graphClient(GraphTraversalSource g, boolean tokensOnly, ExportStats stats, FeatureToggles featureToggles) {
74+
throw new NotImplementedException();
75+
}
76+
77+
@Override
78+
public WriterFactory writerFactory() {
79+
return new QueriesWriterFactory();
80+
}
6281
};
6382

6483
public abstract Collection<String> tokenNames();

Diff for: src/main/java/com/amazonaws/services/neptune/propertygraph/schema/MasterLabelSchemas.java

+41
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414

1515
import com.amazonaws.services.neptune.propertygraph.Label;
1616

17+
import java.util.ArrayList;
1718
import java.util.Collection;
19+
import java.util.HashMap;
20+
import java.util.HashSet;
1821
import java.util.Map;
22+
import java.util.Set;
1923

2024
public class MasterLabelSchemas {
2125

@@ -27,6 +31,10 @@ public MasterLabelSchemas(Map<Label, MasterLabelSchema> masterLabelSchemas, Grap
2731
this.graphElementType = graphElementType;
2832
}
2933

34+
public MasterLabelSchemas(Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemasCollection, GraphElementType graphElementType) {
35+
this(convertFileSpecificLabelSchemas(fileSpecificLabelSchemasCollection), graphElementType);
36+
}
37+
3038
public Collection<MasterLabelSchema> schemas() {
3139
return masterLabelSchemas.values();
3240
}
@@ -42,4 +50,37 @@ public GraphElementSchemas toGraphElementSchemas() {
4250
}
4351
return graphElementSchemas;
4452
}
53+
54+
private static Map<Label, MasterLabelSchema> convertFileSpecificLabelSchemas(Collection<FileSpecificLabelSchemas> fileSpecificLabelSchemasCollection) {
55+
Set<Label> labels = new HashSet<>();
56+
57+
fileSpecificLabelSchemasCollection.forEach(s -> labels.addAll(s.labels()));
58+
59+
Map<Label, MasterLabelSchema> masterLabelSchemas = new HashMap<>();
60+
61+
for (Label label : labels) {
62+
63+
LabelSchema masterLabelSchema = new LabelSchema(label);
64+
Collection<FileSpecificLabelSchema> fileSpecificLabelSchemas = new ArrayList<>();
65+
66+
for (FileSpecificLabelSchemas fileSpecificLabelSchemasForTask : fileSpecificLabelSchemasCollection) {
67+
if (fileSpecificLabelSchemasForTask.hasSchemasForLabel(label)) {
68+
Set<LabelSchema> labelSchemaSet = new HashSet<>();
69+
for (FileSpecificLabelSchema fileSpecificLabelSchema :
70+
fileSpecificLabelSchemasForTask.fileSpecificLabelSchemasFor(label)) {
71+
fileSpecificLabelSchemas.add(fileSpecificLabelSchema);
72+
labelSchemaSet.add(fileSpecificLabelSchema.labelSchema());
73+
}
74+
for (LabelSchema labelSchema : labelSchemaSet) {
75+
masterLabelSchema = masterLabelSchema.union(labelSchema);
76+
}
77+
}
78+
}
79+
80+
masterLabelSchemas.put(
81+
label,
82+
new MasterLabelSchema(masterLabelSchema, fileSpecificLabelSchemas));
83+
}
84+
return masterLabelSchemas;
85+
}
4586
}

Diff for: src/test/java/com/amazonaws/services/neptune/ExportPgFromQueriesIntegrationTest.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -36,45 +36,45 @@ public void testExportPgFromQueries() {
3636
}
3737

3838
@Test
39-
public void testExportPgFromQueriesWithStaggeredResults() {
39+
public void testExportPgFromQueriesNoHeaders() {
4040
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
41-
"-d", outputDir.getPath(),
42-
"-q", "airport=g.inject(['code':'SEA', 'city':'Seattle', 'runways': 3], ['city': 'Vancouver', 'code': 'YVR'], ['code': 'YYC'])"
41+
"-d", outputDir.getPath(), "--format", "csvNoHeaders",
42+
"-q", "airport=g.V().hasLabel('airport').has('runways', gt(2)).project('code', 'runways', 'city', 'country').by('code').by('runways').by('city').by('country')"
4343
};
4444
final NeptuneExportRunner runner = new NeptuneExportRunner(command);
4545
runner.run();
4646

4747
final File resultDir = outputDir.listFiles()[0];
4848

49-
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesWithStaggeredResults"), resultDir);
49+
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesNoHeaders"), resultDir);
5050
}
5151

5252
@Test
53-
public void testExportPgFromQueriesWithStaggeredResultsNoHeaders() {
53+
public void testExportPgFromQueriesWithStaggeredResults() {
5454
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
55-
"-d", outputDir.getPath(), "--format", "csvNoHeaders",
56-
"-q", "airport=g.inject(['code':'SEA', 'city':'Seattle', 'runways': 3], ['city': 'Vancouver', 'code': 'YVR'], ['code': 'YYC'])"
55+
"-d", outputDir.getPath(),
56+
"-q", "airport=g.inject(['code': 'YYC'], ['city': 'Vancouver', 'code': 'YVR'], ['code':'SEA', 'city':'Seattle', 'runways': 3])"
5757
};
5858
final NeptuneExportRunner runner = new NeptuneExportRunner(command);
5959
runner.run();
6060

6161
final File resultDir = outputDir.listFiles()[0];
6262

63-
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesWithStaggeredResultsNoHeaders"), resultDir);
63+
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesWithStaggeredResults"), resultDir);
6464
}
6565

6666
@Test
67-
public void testExportPgFromQueriesNoHeaders() {
67+
public void testExportPgFromQueriesWithStaggeredResultsNoHeaders() {
6868
final String[] command = {"export-pg-from-queries", "-e", neptuneEndpoint,
6969
"-d", outputDir.getPath(), "--format", "csvNoHeaders",
70-
"-q", "airport=g.V().hasLabel('airport').has('runways', gt(2)).project('code', 'runways', 'city', 'country').by('code').by('runways').by('city').by('country')"
70+
"-q", "airport=g.inject(['code':'SEA', 'city':'Seattle', 'runways': 3], ['city': 'Vancouver', 'code': 'YVR'], ['code': 'YYC'])"
7171
};
7272
final NeptuneExportRunner runner = new NeptuneExportRunner(command);
7373
runner.run();
7474

7575
final File resultDir = outputDir.listFiles()[0];
7676

77-
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesNoHeaders"), resultDir);
77+
assertEquivalentResults(new File("src/test/resources/IntegrationTest/testExportPgFromQueriesWithStaggeredResultsNoHeaders"), resultDir);
7878
}
7979

8080
@Test
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
[ {
22
"name" : "airport",
3-
"queries" : [ "g.inject(['code':'SEA', 'city':'Seattle', 'runways': 3], ['city': 'Vancouver', 'code': 'YVR'], ['code': 'YYC'])" ]
3+
"queries" : [ "g.inject(['code': 'YYC'], ['city': 'Vancouver', 'code': 'YVR'], ['code':'SEA', 'city':'Seattle', 'runways': 3])" ]
44
} ]
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
code,city,runways
2-
"SEA","Seattle",3
3-
"YVR","Vancouver",
42
"YYC",,
3+
"YVR","Vancouver",
4+
"SEA","Seattle",3

0 commit comments

Comments
 (0)