-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload.rs
328 lines (285 loc) · 10.8 KB
/
load.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
use ekg_namespace::PREFIX_CONCEPT;
// Copyright (c) 2018-2023, agnos.ai UK Ltd, all rights reserved.
//---------------------------------------------------------------
// We're using `#[test_log::test]` tests in this file which allows
// you to see the log in your test runner if you set the environment
// variable `RUST_LOG=info` (or debug or trace) and add `--nocapture`
// at the end of your cargo test command line.
// See https://crates.io/crates/test-log.
//
// TODO: Add test for "import axioms" (add test ontology)
use {
ekg_namespace::{
consts::{APPLICATION_N_QUADS, PREFIX_SKOS},
Graph,
Literal,
Namespace,
},
indoc::formatdoc,
iref::Iri,
rdfox_rs::{
DataStore,
DataStoreConnection,
FactDomain,
GraphConnection,
Namespaces,
Parameters,
PersistenceMode,
RoleCreds,
Server,
ServerConnection,
Statement,
Transaction,
},
// std::path::Path,
std::{ops::Deref, sync::Arc},
};
fn test_define_data_store() -> Result<Arc<DataStore>, ekg_error::Error> {
tracing::info!("test_define_data_store");
#[cfg(feature = "rdfox-7-0")]
let data_store_params = Parameters::empty()?
.persist_datastore(PersistenceMode::Off)?;
#[cfg(not(feature = "rdfox-7-0"))]
let data_store_params = Parameters::empty()?
.persist_datastore(PersistenceMode::Off)?
.persist_roles(PersistenceMode::Off)?;
DataStore::declare_with_parameters("example", data_store_params)
}
fn test_create_server() -> Result<Arc<Server>, ekg_error::Error> {
tracing::info!("test_create_server");
#[cfg(feature = "rdfox-7-0")]
let server_params = Parameters::empty()?.persist_datastore(PersistenceMode::Off)?;
#[cfg(not(feature = "rdfox-7-0"))]
let server_params = Parameters::empty()?
.persist_datastore(PersistenceMode::Off)?
.persist_roles(PersistenceMode::Off)?;
// TODO: The line below causes a SIGSEGV error when using the static link
// library .api_log_directory(Path::new("./tests"))?;
Server::start_with_parameters(RoleCreds::default(), Some(server_params))
}
fn test_create_server_connection(
server: Arc<Server>,
) -> Result<Arc<ServerConnection>, ekg_error::Error> {
tracing::info!("test_create_server_connection");
let server_connection = server.connection_with_default_role()?;
let number_of_threads = server_connection.get_number_of_threads()?;
tracing::info!("Using {number_of_threads} threads");
let (max_used_bytes, available_bytes) = server_connection.get_memory_use()?;
tracing::info!(
"Memory use: max_used_bytes={max_used_bytes}, available_bytes={available_bytes}"
);
assert!(server_connection.get_number_of_threads()? > 0);
// We next specify how many threads the server should use during import of
// data and reasoning.
server_connection.set_number_of_threads(2)?;
assert_eq!(server_connection.get_number_of_threads()?, 2);
Ok(server_connection)
}
#[allow(dead_code)]
fn test_create_data_store_connection(
server_connection: &Arc<ServerConnection>,
data_store: &Arc<DataStore>,
) -> Result<Arc<DataStoreConnection>, ekg_error::Error> {
tracing::info!("test_create_data_store");
server_connection.create_data_store(data_store)?;
server_connection.connect_to_data_store(data_store)
}
fn test_create_graph(
ds_connection: &Arc<DataStoreConnection>,
name: &str,
) -> Result<Arc<GraphConnection>, ekg_error::Error> {
tracing::info!("test_create_graph");
let graph_base_iri = Namespace::declare_iref_iri(
"graph:",
Iri::new("https://whatever.kom/graph/").unwrap(),
)?;
let test_graph = Graph::declare(graph_base_iri, name);
if name == "test" {
assert_eq!(format!("{:}", test_graph).as_str(), "graph:test");
assert_eq!(
format!("{:}", test_graph.as_display_iri()).as_str(),
"<https://whatever.kom/graph/test>"
);
}
Ok(GraphConnection::new(
ds_connection.clone(),
test_graph,
None,
))
}
#[allow(dead_code)]
fn test_count_some_stuff_in_the_store(
tx: &Arc<Transaction>,
ds_connection: &Arc<DataStoreConnection>,
) -> Result<(), ekg_error::Error> {
tracing::info!("test_count_some_stuff_in_the_store");
let count = ds_connection.get_triples_count(tx, FactDomain::ALL);
assert!(count.is_ok());
assert_eq!(count.unwrap(), 1904);
Ok(())
}
#[allow(dead_code)]
fn test_count_some_stuff_in_the_graph(
tx: &Arc<Transaction>,
graph_connection: &GraphConnection,
) -> Result<(), ekg_error::Error> {
tracing::info!("test_count_some_stuff_in_the_graph");
let count = graph_connection.get_triples_count(tx, FactDomain::ALL);
assert!(count.is_ok());
assert_eq!(count.unwrap(), 37);
Ok(())
}
#[allow(dead_code)]
fn test_cursor_with_lexical_value(
tx: &Arc<Transaction>,
graph_connection: &Arc<GraphConnection>,
) -> Result<(), ekg_error::Error> {
tracing::info!("test_cursor_with_lexical_value");
let graph = graph_connection.graph.as_display_iri();
let prefixes = Namespaces::empty()?;
let query = Statement::new(
&prefixes,
formatdoc!(
r##"
SELECT ?subject ?predicate ?object
FROM {graph}
WHERE {{
?subject a <https://ekgf.org/ontology/user-story/UserStory> ;
?predicate ?object
}}
"##,
)
.into(),
)?;
let mut cursor = query.cursor(
&graph_connection.data_store_connection,
&Parameters::empty()?.fact_domain(FactDomain::ASSERTED)?,
)?;
let count = cursor.consume(tx, 10000, |row| {
assert_eq!(row.opened.arity, 3);
for term_index in 0..row.opened.arity {
let value = row.lexical_value(term_index)?;
tracing::info!("{value:?}");
}
Result::<(), ekg_error::Error>::Ok(())
})?;
tracing::info!("Number of rows processed: {count}");
Ok(())
}
#[allow(dead_code)]
fn test_run_query_to_nquads_buffer(
_tx: &Arc<Transaction>, // TODO: consider passing tx to evaluate_to_stream()
ds_connection: &Arc<DataStoreConnection>,
) -> Result<(), ekg_error::Error> {
tracing::info!("test_run_query_to_nquads_buffer");
let nquads_query = Statement::nquads_query(&Namespaces::empty()?)?;
let writer = std::io::stdout();
ds_connection.evaluate_to_stream(
writer,
&nquads_query,
APPLICATION_N_QUADS.deref(),
None,
)?;
tracing::info!("test_run_query_to_nquads_buffer passed");
Ok(())
}
pub fn get_concept(
concept_id: &Literal,
graph_connection: &Arc<GraphConnection>,
) -> Result<Statement, ekg_error::Error> {
let prefixes = Namespaces::default_namespaces()?
.add_namespace(&PREFIX_CONCEPT)?
.add_namespace(&PREFIX_SKOS)?;
let graph = graph_connection.graph.as_display_iri();
let sparql = formatdoc! {
r##"
SELECT DISTINCT ?key ?label ?comment ?data_type ?rdfs_class ?predicate
WHERE {{
VALUES ?concept {{
{concept_id}
}}
GRAPH {graph} {{
?concept a concept:ClassConcept ; concept:key ?key .
OPTIONAL {{
?concept rdfs:label ?label
}} .
OPTIONAL {{
?concept rdfs:comment ?comment
}} .
OPTIONAL {{
?concept concept:rdfsClass ?rdfs_class
}} .
OPTIONAL {{
?concept concept:type ?data_type
}} .
OPTIONAL {{
?concept concept:predicate ?predicate
}} .
}}
}}
ORDER BY ?key
"##
};
Ok(Statement::new(&prefixes, sparql.into())?)
}
#[allow(dead_code)]
fn test_query_concepts(
tx: &Arc<Transaction>, // TODO: consider passing tx to evaluate_to_stream()
graph_connection: &Arc<GraphConnection>,
) -> Result<(), ekg_error::Error> {
let concept_id = Literal::new_iri_reference_from_str(
"https://placeholder.kg/id/concept-legal-person-legal-name-iri",
)?;
let statement = get_concept(&concept_id, graph_connection)?;
let parameters = Parameters::empty()?.fact_domain(FactDomain::ALL)?;
let mut cursor = statement.cursor(&tx.connection, ¶meters)?;
let count = cursor.consume(tx, 1000, |row| {
tracing::info!("{row:?}");
// for _term_index in 0..row.opened.arity {
// if let Some(_value) = row.lexical_value(term_index)? {
// } else {
// tracing::error!("{concept_id} is missing column
// {term_index}:\n{statement:}"); }
// }
Ok::<(), ekg_error::Error>(())
})?;
assert!(count > 0);
Ok(())
}
/// Run the test with `RUST_LOG=info cargo test -- --nocapture` if you'd like to see what's going on.
#[test_log::test]
fn load_rdfox() -> Result<(), ekg_error::Error> {
eprintln!("running test load_rdfox:");
tracing::info!("load_rdfox test start");
let server = test_create_server()?;
let server_connection = test_create_server_connection(server)?;
tracing::info!(
"Server version is {}",
server_connection.get_version()?
);
let data_store = test_define_data_store()?;
// Create a separate scope to control the life-time of `pool` which
// will ensure that the DataStoreConnection created by
// `pool_for()` is destroyed at the end of this scope.
{
let pool = data_store.pool_for(&server_connection, true, true)?;
let conn = pool.get().unwrap();
let graph_connection_test = test_create_graph(&conn, "test")?;
let graph_connection_meta = test_create_graph(&conn, "meta")?;
graph_connection_test.import_data_from_file("tests/test.ttl")?;
graph_connection_meta.import_data_from_file("tests/concepts.ttl")?;
Transaction::begin_read_only(&conn)?.execute_and_rollback(|ref tx| {
test_count_some_stuff_in_the_store(tx, &conn)?;
test_count_some_stuff_in_the_graph(tx, &graph_connection_test)?;
test_cursor_with_lexical_value(tx, &graph_connection_test)?;
test_run_query_to_nquads_buffer(tx, &conn)
})?;
Transaction::begin_read_only(&conn)?
.execute_and_rollback(|ref tx| test_query_concepts(tx, &graph_connection_meta))?;
}
std::thread::sleep(std::time::Duration::from_millis(500)); // wait for connection pool threads to end
tracing::info!("Datastore connection is now destroyed, now we can delete the data store:");
server_connection.delete_data_store(&data_store)?;
tracing::info!("load_rdfox end");
Ok(())
}