Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1235,44 +1235,5 @@ http://www.springframework.org/schema/util/spring-util-4.0.xsd">
</set>
</property>
</bean>

<!-- Define the set of valid 'relationship' values -->
<bean id="protobufedge.table.relationships" class="org.springframework.beans.factory.config.SetFactoryBean">
<property name="targetSetClass" value="java.util.HashSet"/>
<property name="sourceSet">
<set>
<value>PERSON</value>
<value>CHARACTER</value>
<value>SHOW</value>
<value>TO</value>
<value>FROM</value>
<value>EQUIVALENCE</value>
<value>USERNAME</value>
<value>USERID</value>
<value>PAGEID</value>
<value>REVISIONID</value>
<value>CONTRIBUTORID</value>
<value>CONTRIBUTOR</value>
<value>COMMENT</value>
<value>USER</value>
<value>PAGE</value>
<value>REDIRECT</value>
<value>REVISION</value>
</set>
</property>
</bean>

<!-- Define the set of valid 'collection' values -->
<bean id="protobufedge.table.collections" class="org.springframework.beans.factory.config.SetFactoryBean">
<property name="targetSetClass" value="java.util.HashSet"/>
<property name="sourceSet">
<set>
<value>CSV_METADATA</value>
<value>TVMAZE_METADATA</value>
<value>WIKI_PAGE_METADATA</value>
</set>
</property>
</bean>

</beans>

Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ public class ProtobufEdgeDataTypeHandler<KEYIN,KEYOUT,VALUEOUT> implements Exten

public static final String EDGE_SPRING_CONFIG = "protobufedge.spring.config";

public static final String EDGE_SPRING_RELATIONSHIPS = "protobufedge.table.relationships";
public static final String EDGE_SPRING_COLLECTIONS = "protobufedge.table.collections";

public static final String EDGE_SETUP_FAILURE_POLICY = "protobufedge.setup.default.failurepolicy";
public static final String EDGE_PROCESS_FAILURE_POLICY = "protobufedge.process.default.failurepolicy";

Expand Down Expand Up @@ -152,9 +149,6 @@ public class ProtobufEdgeDataTypeHandler<KEYIN,KEYOUT,VALUEOUT> implements Exten

protected EdgeKeyVersioningCache versioningCache = null;

protected HashSet<String> edgeRelationships = new HashSet<>();
protected HashSet<String> collectionType = new HashSet<>();

long futureDelta, pastDelta;
long newFormatStartDate;

Expand Down Expand Up @@ -250,27 +244,13 @@ public void setup(Configuration conf) {

registry.put(EDGE_DEFAULT_DATA_TYPE, null);

// HashSet<String> edgeRelationships, collectionType;

if (ctx.containsBean(EDGE_SPRING_RELATIONSHIPS) && ctx.containsBean(EDGE_SPRING_COLLECTIONS)) {
edgeRelationships.addAll((HashSet<String>) ctx.getBean(EDGE_SPRING_RELATIONSHIPS));
collectionType.addAll((HashSet<String>) ctx.getBean(EDGE_SPRING_COLLECTIONS));
} else {
log.error("Edge relationships and or collection types are not configured correctly. Cannot build edge definitions");
if (setUpFailurePolicy == FailurePolicy.FAIL_JOB) {
throw new RuntimeException("Missing some spring configurations");
} else {
return; // no edges will be created but the ingest job will continue
}
}

for (Entry<String,Type> entry : registry.entrySet()) {
if (ctx.containsBean(entry.getKey())) {
EdgeDefinitionConfigurationHelper thing = (EdgeDefinitionConfigurationHelper) ctx.getBean(entry.getKey());

// Always call init first before getting getting edge defs. This performs validation on the config file
// Always call init first before getting edge defs. This performs validation on the config file
// and builds the edge pairs/groups
thing.init(edgeRelationships, collectionType);
thing.init();

edges.put(entry.getKey(), thing);
if (thing.getEnrichmentTypeMappings() != null) {
Expand Down Expand Up @@ -430,14 +410,6 @@ public void setEdges(Map<String,EdgeDefinitionConfigurationHelper> edges) {
this.edges = edges;
}

public Map<String,Set<String>> getDisallowlistFieldLookup() {
return disallowlistFieldLookup;
}

public Map<String,Set<String>> getDisallowlistValueLookup() {
return disallowlistValueLookup;
}

private boolean isDisallowlistField(String dataType, String fieldName) {
if (disallowlistFieldLookup.containsKey(dataType)) {
return this.disallowlistFieldLookup.get(dataType).contains(fieldName);
Expand Down Expand Up @@ -1441,8 +1413,4 @@ public RawRecordMetadata getMetadata() {
// TODO Auto-generated method stub
return null;
}

public void setVersioningCache(EdgeKeyVersioningCache versioningCache) {
this.versioningCache = versioningCache;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datawave.ingest.mapreduce.handler.edge.define;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -38,7 +37,7 @@ public void setEdges(List<EdgeDefinition> edges) {
this.edges = edges;
}

public void init(HashSet<String> edgeRelationships, HashSet<String> collectionType) {
public void init() {
// Sanity check before we continue
validateRequiredVariablesSet();

Expand All @@ -50,29 +49,16 @@ public void init(HashSet<String> edgeRelationships, HashSet<String> collectionTy
int nPieces = edgeNodes.size();
for (int ii = 0; ii < nPieces - 1; ii++) {
for (int jj = ii + 1; jj < nPieces; jj++) {

if (validateEdgeNode(edgeNodes.get(ii), edgeRelationships, collectionType)
&& validateEdgeNode(edgeNodes.get(jj), edgeRelationships, collectionType)) {

EdgeDefinition edgePair = buildEdgePair(edgeDefinition, edgeNodes.get(ii), edgeNodes.get(jj));

realEdges.add(edgePair);
}

EdgeDefinition edgePair = buildEdgePair(edgeDefinition, edgeNodes.get(ii), edgeNodes.get(jj));
realEdges.add(edgePair);
}
}
} else if (edgeDefinition.getGroupPairs() != null) {

for (EdgeNode group1 : edgeDefinition.getGroupPairs().getGroup1()) {
for (EdgeNode group2 : edgeDefinition.getGroupPairs().getGroup2()) {

if (validateEdgeNode(group1, edgeRelationships, collectionType) && validateEdgeNode(group2, edgeRelationships, collectionType)) {

EdgeDefinition groupPair = buildEdgePair(edgeDefinition, group1, group2);

realEdges.add(groupPair);
}

EdgeDefinition groupPair = buildEdgePair(edgeDefinition, group1, group2);
realEdges.add(groupPair);
}
}
} else {
Expand Down Expand Up @@ -130,16 +116,6 @@ public EdgeDefinition buildEdgePair(EdgeDefinition edgeDefinition, EdgeNode node
return edgePair;
}

private boolean validateEdgeNode(EdgeNode node, HashSet<String> edgeRelationships, HashSet<String> collectionType) {
if (edgeRelationships.contains(node.getRelationship()) && collectionType.contains(node.getCollection())) {
return true;
} else {
log.error("Edge Definition in config file does not have a matching edge relationship and collection type for Relationship: "
+ node.getRelationship() + " and Collection: " + node.getCollection());
return false;
}
}

// Sanity checks to make sure configuration was set up properly
private void validateRequiredVariablesSet() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,17 @@ http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<bean class="datawave.ingest.mapreduce.handler.edge.define.EdgeNode">
<!-- (data-driven key component) for a given CSV record, use its EDGE_VERTEX_FROM value for the "SOURCE" component of the DataWave edge key -->
<property name="selector" value="EDGE_VERTEX_FROM"/>
<!-- (config-driven key component) use this value to denote how this vertex is related to the "SINK" vertex,
defined by 'protobufedge.table.relationships' bean below -->
<!-- (config-driven key component) use this value to denote how this vertex is related to the "SINK" vertex -->
<property name="relationship" value="FROM"/>
<!-- (config-driven key component) use this value to identify the origin or taxonomy of the datasource,
defined by 'protobufedge.table.collections' bean below-->
<!-- (config-driven key component) use this value to identify the origin or taxonomy of the datasource -->
<property name="collection" value="MY_CSV_DATA"/>
</bean>
<bean class="datawave.ingest.mapreduce.handler.edge.define.EdgeNode">
<!-- (data-driven key component) for a given CSV record, use its EDGE_VERTEX_TO value for the "SINK" component of the DataWave edge key -->
<property name="selector" value="EDGE_VERTEX_TO"/>
<!-- (config-driven key component) use this value to denote how this vertex is related to the "SOURCE" vertex,
defined by 'protobufedge.table.relationships' bean below -->
<!-- (config-driven key component) use this value to denote how this vertex is related to the "SOURCE" vertex -->
<property name="relationship" value="TO"/>
<!-- (config-driven key component) use this value to identify the origin or taxonomy of the datasource,
defined by 'protobufedge.table.collections' bean below-->
<!-- (config-driven key component) use this value to identify the origin or taxonomy of the datasource -->
<property name="collection" value="MY_CSV_DATA"/>
</bean>
</list>
Expand All @@ -56,29 +52,5 @@ http://www.springframework.org/schema/util/spring-util-4.0.xsd">
</bean>

<bean id="csv" class="datawave.ingest.mapreduce.handler.edge.define.EdgeDefinitionConfigurationHelper" parent="mycsv"/>


<!-- Define the set of valid 'relationship' values -->
<bean id="protobufedge.table.relationships" class="org.springframework.beans.factory.config.SetFactoryBean">
<property name="targetSetClass" value="java.util.HashSet"/>
<property name="sourceSet">
<set>
<value>TO</value>
<value>FROM</value>
</set>
</property>
</bean>

<!-- Define the set of valid 'collection' values -->
<bean id="protobufedge.table.collections" class="org.springframework.beans.factory.config.SetFactoryBean">
<property name="targetSetClass" value="java.util.HashSet"/>
<property name="sourceSet">
<set>
<value>MY_CSV_DATA</value>
<value>UNKNOWN</value>
</set>
</property>
</bean>

</beans>

Loading
Loading