Skip to content

Commit e33c44b

Browse files
committed
Add Barebones of LOAD CSV Command
That Old AGE csv system was always broken. Start replacing it with something that works well. Not John well. Actually well.
1 parent 6d67dfc commit e33c44b

File tree

8 files changed

+377
-45
lines changed

8 files changed

+377
-45
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,16 @@ OBJS = src/backend/postgraph.o \
6363
src/backend/utils/ag_func.o \
6464
src/backend/utils/edge_searching.o \
6565
src/backend/utils/cache/ag_cache.o \
66+
src/backend/utils/csv/csv.o
6667

6768
EXTENSION = postgraph
6869

6970
DATA = postgraph--0.1.0.sql
7071

7172
REGRESS = new_cypher \
7273
cypher_create \
73-
cypher_merge
74+
cypher_merge \
75+
csv_load
7476

7577
srcdir=`pwd`
7678
POSTGIS_DIR ?= postgis_dir

postgraph--0.1.0.sql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6901,3 +6901,12 @@ LANGUAGE C
69016901
COST 50000
69026902
AS 'MODULE_PATHNAME';
69036903

6904+
CREATE FUNCTION load_csv(gtype)
6905+
RETURNS SETOF gtype
6906+
CALLED ON NULL INPUT
6907+
STABLE
6908+
PARALLEL SAFE
6909+
LANGUAGE C
6910+
COST 50000
6911+
AS 'MODULE_PATHNAME';
6912+

regress/expected/csv_load.out

Whitespace-only changes.

regress/sql/csv_load.sql

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
2+
\! cp regress/test.csv regress/instance/data/
3+
4+
LOAD 'postgraph';
5+
6+
CREATE GRAPH cypher_create;
7+
USE GRAPH cypher_create;
8+
9+
CYPHER LOAD CSV './test.csv' AS n
10+
RETURN n;
11+
12+
--
13+
-- Clean up
14+
--
15+
DROP GRAPH cypher_create CASCADE;
16+
17+
--
18+
-- End
19+
--

src/backend/parser/cypher_clause.c

Lines changed: 81 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ static Node *transform_clause_for_join(cypher_parsestate *cpstate, cypher_clause
8484
static cypher_clause *convert_merge_to_match(cypher_merge *merge);
8585
static void transform_cypher_merge_mark_tuple_position(cypher_parsestate *cpstate, List *target_list, cypher_create_path *path);
8686

87+
// load csv
88+
static Query *transform_cypher_load_csv(cypher_parsestate *cpstate, cypher_clause *clause);
89+
8790
// transform
8891
#define PREV_CYPHER_CLAUSE_ALIAS "_"
8992
#define CYPHER_OPT_RIGHT_ALIAS "_R"
@@ -145,6 +148,8 @@ Query *transform_cypher_clause(cypher_parsestate *cpstate, cypher_clause *clause
145148
result = transform_cypher_create(cpstate, clause);
146149
} else if (is_ag_node(self, cypher_merge)) {
147150
result = transform_cypher_merge(cpstate, clause);
151+
} else if (is_ag_node(self, cypher_load_csv)) {
152+
result = transform_cypher_load_csv(cpstate, clause);
148153
} else {
149154
ereport(ERROR, (errmsg_internal("unexpected Node for cypher_clause")));
150155
}
@@ -156,6 +161,74 @@ Query *transform_cypher_clause(cypher_parsestate *cpstate, cypher_clause *clause
156161
}
157162

158163

164+
static FuncExpr *make_load_csv_clause_function(char *file_name) {
165+
166+
return makeFuncExpr(
167+
get_ag_func_oid("load_csv", 1, TEXTOID),
168+
GTYPEOID,
169+
list_make1(makeConst(TEXTOID, -1, InvalidOid, strlen(file_name), PointerGetDatum(file_name), false, false)),
170+
InvalidOid,
171+
InvalidOid,
172+
COERCE_EXPLICIT_CALL);
173+
}
174+
175+
static Query *transform_cypher_load_csv(cypher_parsestate *cpstate, cypher_clause *clause) {
176+
ParseState *pstate = (ParseState *)cpstate;
177+
cypher_load_csv *self = (cypher_create *)clause->self;
178+
179+
Query *query = makeNode(Query);
180+
query->commandType = CMD_SELECT;
181+
query->targetList = NIL;
182+
183+
if (clause->prev) {
184+
query->targetList = list_concat(query->targetList,
185+
expandNSItemAttrs(get_parse_state(cpstate),
186+
transform_prev_cypher_clause(cpstate, clause->prev, true),
187+
0,
188+
-1));
189+
}
190+
191+
192+
193+
ParseNamespaceItem *pnsi = add_srf_to_query(
194+
cpstate,
195+
makeFuncCall(
196+
list_make2(makeString("postgraph"), makeString("load_csv")),
197+
list_make1(makeString(self->file)),
198+
COERCE_EXPLICIT_CALL, -1),
199+
self->alias,
200+
list_make1(makeString("val")));
201+
202+
query->targetList = lappend(query->targetList,
203+
makeTargetEntry(
204+
scanNSItemForColumn(pstate, pnsi, 0, "val", -1),
205+
pstate->p_next_resno++,
206+
self->alias,
207+
false));
208+
209+
query->rtable = pstate->p_rtable;
210+
query->jointree = makeFromExpr(pstate->p_joinlist, NULL);
211+
212+
if (clause->next)
213+
return query;
214+
215+
216+
{
217+
cypher_parsestate *new_cpstate = make_cypher_parsestate(cpstate);
218+
Query *topquery = makeNode(Query);
219+
topquery->commandType = CMD_SELECT;
220+
topquery->targetList = NIL;
221+
222+
transform_cypher_clause_as_subquery_2(new_cpstate, query);
223+
224+
topquery->rtable = new_cpstate->pstate.p_rtable;
225+
topquery->jointree = makeFromExpr(new_cpstate->pstate.p_joinlist, NULL);
226+
227+
return topquery;
228+
}
229+
}
230+
231+
159232
/*
160233
* This function is similar to transformFromClause() that is called with a
161234
* single RangeSubselect.
@@ -2541,23 +2614,15 @@ static cypher_target_node *transform_merge_cypher_edge(cypher_parsestate *cpstat
25412614
table_close(rel, NoLock);
25422615

25432616
// props
2544-
if (edge->props) {
2545-
/* query->targetList = lappend(query->targetList,
2546-
makeTargetEntry(//BlackPink
2547-
add_volatile_wrapper(transform_cypher_expr(cpstate, edge->props, EXPR_KIND_INSERT_TARGET)),
2617+
if (edge->props)
2618+
query->targetList = lappend(query->targetList,
2619+
makeTargetEntry(
2620+
add_volatile_wrapper(target_node->prop_expr = transform_cypher_expr(cpstate, edge->props, EXPR_KIND_INSERT_TARGET)),
25482621
target_node->prop_attr_num = get_parse_state(cpstate)->p_next_resno++,
25492622
make_property_alias(edge->name),
25502623
false));
2551-
*/
2552-
query->targetList = lappend(query->targetList,
2553-
makeTargetEntry(
2554-
add_volatile_wrapper(scanNSItemForColumn(pstate, edge->pnsi, 0, AG_EDGE_COLNAME_PROPERTIES, -1)),
2555-
target_node->prop_attr_num = get_parse_state(cpstate)->p_next_resno++,
2556-
make_property_alias(edge->name),
2557-
false));
2558-
} else {
2624+
else
25592625
target_node->prop_attr_num = InvalidAttrNumber;
2560-
}
25612626

25622627
return target_node;
25632628
}
@@ -2570,11 +2635,6 @@ static cypher_target_node *transform_merge_cypher_node(cypher_parsestate *cpstat
25702635

25712636

25722637
if (node->name) {
2573-
/* ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2574-
errmsg("MERGE does not support variables"),
2575-
parser_errposition(get_parse_state(cpstate), node->location)));
2576-
*/
2577-
// /ereport(ERROR, (errmsg_internal("nodes in CREATE cannot be a variable")));
25782638
Var *var = NULL;
25792639
if ((var = colNameToVar(cpstate, node->name, false, -1)) ||
25802640
(target_node->id_attr_num = get_target_entry_resno(cpstate, query->targetList, make_id_alias(node->name))) != -1) {
@@ -2608,7 +2668,6 @@ static cypher_target_node *transform_merge_cypher_node(cypher_parsestate *cpstat
26082668
target_node->label_name = node->label;
26092669
} else {
26102670
target_node->label_name = "";
2611-
//node->label = AG_DEFAULT_LABEL_VERTEX;
26122671
}
26132672

26142673
target_node->flags |= CYPHER_TARGET_NODE_FLAG_INSERT;
@@ -2621,35 +2680,15 @@ static cypher_target_node *transform_merge_cypher_node(cypher_parsestate *cpstat
26212680
table_close(rel, NoLock);
26222681
target_node->adj_relid = lcd->vertex_adjlist;
26232682
// props
2624-
if (node->props) {
2625-
target_node->prop_expr = transform_cypher_expr(cpstate, node->props, EXPR_KIND_INSERT_TARGET);
2683+
if (node->props)
26262684
query->targetList = lappend(query->targetList,
26272685
makeTargetEntry(
2628-
add_volatile_wrapper(transform_cypher_expr(cpstate, node->props, EXPR_KIND_INSERT_TARGET)),
2686+
add_volatile_wrapper(target_node->prop_expr = transform_cypher_expr(cpstate, node->props, EXPR_KIND_INSERT_TARGET)),
26292687
target_node->prop_attr_num = get_parse_state(cpstate)->p_next_resno++,
26302688
make_property_alias(node->name),
26312689
false));
2632-
/*query->targetList = lappend(query->targetList,
2633-
makeTargetEntry(
2634-
colNameToVar(cpstate, make_property_alias(node->name), false, -1),
2635-
target_node->prop_attr_num = get_parse_state(cpstate)->p_next_resno++,
2636-
make_property_alias(node->name),
2637-
false));*/
2638-
/*
2639-
query->targetList = lappend(query->targetList,
2640-
makeTargetEntry(
2641-
add_volatile_wrapper(scanNSItemForColumn(get_parse_state(cpstate), refnameNamespaceItem(cpstate, NULL, node->name, -1, NULL), 0, AG_VERTEX_COLNAME_PROPERTIES, -1)),
2642-
target_node->prop_attr_num = get_parse_state(cpstate)->p_next_resno++,
2643-
make_property_alias(node->name),
2644-
false));
2645-
2646-
colNameToVar(cpstate, node->name, false, -1))
2647-
*/
2648-
2649-
2650-
} else {
2690+
else
26512691
target_node->prop_attr_num = InvalidAttrNumber;
2652-
}
26532692

26542693
return target_node;
26552694
}

src/backend/parser/cypher_gram.y

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11849,11 +11849,12 @@ clause:
1184911849

1185011850

1185111851
load_stmt:
11852-
LOAD CSV Sconst
11852+
LOAD CSV Sconst AS ColId
1185311853
{
1185411854
cypher_load_csv *n = make_ag_node(cypher_load_csv);
1185511855
n->file = $3;
11856-
11856+
n->alias = $5;
11857+
1185711858
$$ = n;
1185811859
};
1185911860

0 commit comments

Comments
 (0)