Skip to content

Commit c3313ae

Browse files
andro2157PhilipDeegan
authored andcommitted
Longitudinal preprocessor multithreading
uint to size_t oops updated swig updates for single swig header include directory template init
1 parent ea27359 commit c3313ae

11 files changed

+650
-53
lines changed

lib/cpp/preprocessing/longitudinal_features_lagger.cpp

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,38 @@
66

77
#include "tick/preprocessing/longitudinal_features_lagger.h"
88

9+
910
LongitudinalFeaturesLagger::LongitudinalFeaturesLagger(
10-
const SBaseArrayDouble2dPtrList1D &features, const SArrayULongPtr n_lags)
11-
: n_intervals(features[0]->n_rows()),
12-
n_lags(n_lags),
13-
n_samples(features.size()),
14-
n_observations(n_samples * n_intervals),
15-
n_features(features[0]->n_cols()),
16-
n_lagged_features(n_lags->sum() + n_lags->size()) {
17-
col_offset = ArrayULong(n_lags->size());
18-
col_offset.init_to_zero();
19-
if (n_features != n_lags->size()) {
20-
TICK_ERROR("Features matrix column number should match n_lags length.");
21-
}
22-
if ((*n_lags)[0] >= n_intervals) {
23-
TICK_ERROR("n_lags elements must be between 0 and (n_intervals - 1).");
24-
}
11+
ulong n_intervals,
12+
SArrayULongPtr _n_lags)
13+
: n_intervals(n_intervals),
14+
n_lags(_n_lags),
15+
n_features(_n_lags->size()),
16+
n_lagged_features(_n_lags->size() + _n_lags->sum()) {
17+
if (n_lags != nullptr) compute_col_offset(n_lags);
18+
}
19+
20+
void LongitudinalFeaturesLagger::compute_col_offset(const SArrayULongPtr n_lags) {
21+
ArrayULong col_offset_temp = ArrayULong(n_lags->size());
22+
col_offset_temp.init_to_zero();
2523
for (ulong i(1); i < n_lags->size(); i++) {
2624
if ((*n_lags)[i] >= n_intervals) {
2725
TICK_ERROR("n_lags elements must be between 0 and (n_intervals - 1).");
2826
}
29-
col_offset[i] = col_offset[i - 1] + (*n_lags)[i - 1] + 1;
27+
col_offset_temp[i] = col_offset_temp[i - 1] + (*n_lags)[i-1] + 1;
3028
}
29+
col_offset = col_offset_temp.as_sarray_ptr();
3130
}
3231

3332
void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features,
3433
ArrayDouble2d &out,
3534
ulong censoring) const {
35+
if (n_intervals != features.n_rows()) {
36+
TICK_ERROR("Features matrix rows count should match n_intervals.");
37+
}
38+
if (n_features != features.n_cols()) {
39+
TICK_ERROR("Features matrix column count should match n_lags length.");
40+
}
3641
if (out.n_cols() != n_lagged_features) {
3742
TICK_ERROR(
3843
"n_columns of &out should be equal to n_features + sum(n_lags).");
@@ -46,8 +51,9 @@ void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features,
4651
n_cols_feature = (*n_lags)[feature] + 1;
4752
for (ulong j = 0; j < n_intervals; j++) {
4853
row = j;
49-
col = col_offset[feature];
50-
value = features(row, feature);
54+
col = (*col_offset)[feature];
55+
// use view_row instead of (row, feature) to be const
56+
value = view_row(features, row)[feature];
5157
max_col = col + n_cols_feature;
5258
if (value != 0) {
5359
while (row < censoring && col < max_col) {
@@ -60,17 +66,22 @@ void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features,
6066
}
6167
}
6268

63-
void LongitudinalFeaturesLagger::sparse_lag_preprocessor(
64-
ArrayULong &row, ArrayULong &col, ArrayDouble &data, ArrayULong &out_row,
65-
ArrayULong &out_col, ArrayDouble &out_data, ulong censoring) const {
69+
void LongitudinalFeaturesLagger::sparse_lag_preprocessor(ArrayULong &row,
70+
ArrayULong &col,
71+
ArrayDouble &data,
72+
ArrayULong &out_row,
73+
ArrayULong &out_col,
74+
ArrayDouble &out_data,
75+
ulong censoring) const {
76+
// TODO: add checks here ? Or do them in Python ?
6677
ulong j(0), r, c, offset, new_col, max_col;
6778
double value;
6879

6980
for (ulong i = 0; i < data.size(); i++) {
7081
value = data[i];
7182
r = row[i];
7283
c = col[i];
73-
offset = col_offset[c];
84+
offset = (*col_offset)[c];
7485
max_col = offset + (*n_lags)[c] + 1;
7586
new_col = offset;
7687

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
// License: BSD 3 clause
2+
3+
//
4+
// Created by Maryan Morel on 15/05/2017.
5+
//
6+
7+
#include <mutex>
8+
#include "tick/preprocessing/longitudinal_features_lagger_mp.h"
9+
10+
LongitudinalFeaturesLagger_MP::LongitudinalFeaturesLagger_MP(ulong n_intervals,
11+
SArrayULongPtr _n_lags, size_t n_jobs)
12+
: LongitudinalPreprocessor(n_jobs),
13+
n_intervals(n_intervals),
14+
n_lags(_n_lags),
15+
n_features(_n_lags->size()),
16+
n_lagged_features(_n_lags->size() + _n_lags->sum()) {
17+
if (n_lags != nullptr) compute_col_offset(n_lags);
18+
n_output_features = get_n_output_features();
19+
}
20+
21+
void LongitudinalFeaturesLagger_MP::compute_col_offset(const SArrayULongPtr n_lags) {
22+
ArrayULong col_offset_temp = ArrayULong(n_lags->size());
23+
col_offset_temp.init_to_zero();
24+
for (ulong i(1); i < n_lags->size(); i++) {
25+
if ((*n_lags)[i] > n_intervals) { // (*n_lags)[i] >= n_intervals
26+
TICK_ERROR("n_lags elements must be between 0 and n_intervals."); // (n_intervals - 1) was
27+
// actually wrong?
28+
}
29+
col_offset_temp[i] = col_offset_temp[i - 1] + (*n_lags)[i - 1] + 1;
30+
}
31+
col_offset = col_offset_temp.as_sarray_ptr();
32+
}
33+
34+
void LongitudinalFeaturesLagger_MP::dense_lag_preprocessor(ArrayDouble2d &features,
35+
ArrayDouble2d &out,
36+
ulong censoring) const {
37+
if (n_intervals != features.n_rows()) {
38+
TICK_ERROR("Features matrix rows count should match n_intervals.");
39+
}
40+
if (n_features != features.n_cols()) {
41+
TICK_ERROR("Features matrix column count should match n_lags length.");
42+
}
43+
if (out.n_cols() != n_lagged_features) {
44+
TICK_ERROR("n_columns of &out should be equal to n_features + sum(n_lags).");
45+
}
46+
if (out.n_rows() != n_intervals) {
47+
TICK_ERROR("n_rows of &out is inconsistent with n_intervals");
48+
}
49+
ulong n_cols_feature, row, col, max_col;
50+
double value;
51+
for (ulong feature = 0; feature < n_features; feature++) {
52+
n_cols_feature = (*n_lags)[feature] + 1;
53+
for (ulong j = 0; j < n_intervals; j++) {
54+
row = j;
55+
col = (*col_offset)[feature];
56+
// use view_row instead of (row, feature) to be const
57+
value = view_row(features, row)[feature];
58+
max_col = col + n_cols_feature;
59+
if (value != 0) {
60+
while (row < censoring && col < max_col) {
61+
out[row * n_lagged_features + col] = value;
62+
row++;
63+
col++;
64+
}
65+
}
66+
}
67+
}
68+
}
69+
70+
void LongitudinalFeaturesLagger_MP::sparse_lag_preprocessor(ArrayULong &row, ArrayULong &col,
71+
ArrayDouble &data, ArrayULong &out_row,
72+
ArrayULong &out_col,
73+
ArrayDouble &out_data,
74+
ulong censoring) const {
75+
// TODO: add checks here ? Or do them in Python ?
76+
if (row.size() != col.size() || col.size() != data.size() || data.size() != row.size())
77+
TICK_ERROR("row, col and data arrays should have the same size (coo matrix)");
78+
if (out_row.size() != out_col.size() || out_col.size() != out_data.size() ||
79+
out_data.size() != out_row.size())
80+
TICK_ERROR("out_row, out_col and out_data arrays should have the same size (coo matrix)");
81+
82+
ulong j(0), r, c, offset, new_col, max_col;
83+
double value;
84+
85+
for (ulong i = 0; i < data.size(); i++) {
86+
value = data[i];
87+
r = row[i];
88+
c = col[i];
89+
offset = (*col_offset)[c];
90+
max_col = offset + (*n_lags)[c] + 1;
91+
new_col = offset;
92+
93+
while (r < censoring && new_col < max_col) {
94+
out_row[j] = r;
95+
out_col[j] = new_col;
96+
out_data[j] = value;
97+
r++;
98+
new_col++;
99+
j++;
100+
}
101+
}
102+
}
103+
104+
ulong LongitudinalFeaturesLagger_MP::get_n_output_features() {
105+
ulong arraysum = 0;
106+
std::vector<ulong> arraysumint;
107+
for (size_t i = 0; i < n_lags->size(); i++) arraysumint.push_back((*n_lags)[i] + 1);
108+
for (ulong i : arraysumint) arraysum += i;
109+
return arraysum;
110+
}
111+
112+
SSparseArrayDouble2dPtr LongitudinalFeaturesLagger_MP::sparse_lagger(
113+
SSparseArrayDouble2dPtr &feature_matrix, ulong censoring_i) {
114+
if (censoring_i > n_intervals || censoring_i < 1)
115+
TICK_ERROR("censoring shoud be an integer in [1, n_intervals]");
116+
117+
CooMatrix<double> coo(feature_matrix);
118+
119+
// TODO FIX this is wrong, but the coo.toSparse removes all the useless zero data.
120+
ulong estimated_nnz = coo.nnz * n_output_features;
121+
122+
// std::cout << "estimated nnz : " << estimated_nnz << " nnz : " << coo.nnz
123+
// << " arraysum : " << n_output_features << std::endl;
124+
125+
ArrayULong out_row(estimated_nnz);
126+
ArrayULong out_col(estimated_nnz);
127+
ArrayDouble out_data(estimated_nnz);
128+
129+
out_row.init_to_zero();
130+
out_col.init_to_zero();
131+
out_data.init_to_zero();
132+
133+
sparse_lag_preprocessor(coo.rows, coo.cols, coo.data, out_row, out_col, out_data, censoring_i);
134+
135+
coo.rows = out_row;
136+
coo.cols = out_col;
137+
coo.data = out_data;
138+
139+
return coo.toSparse(n_intervals, n_output_features);
140+
}
141+
142+
void LongitudinalFeaturesLagger_MP::transform_thread_dense(
143+
std::vector<ArrayDouble2d> splited_features, std::vector<ArrayDouble2d> &output,
144+
std::mutex &thread_mutex, std::vector<ulong> splited_censoring) {
145+
for (ulong i = 0; i < splited_features.size(); i++) {
146+
ArrayDouble2d transformed(splited_features[i].n_rows(), n_output_features);
147+
transformed.init_to_zero();
148+
dense_lag_preprocessor(splited_features[i], transformed, splited_censoring[i]);
149+
thread_mutex.lock(); // just in case, needed ?
150+
output.push_back(transformed);
151+
thread_mutex.unlock();
152+
}
153+
}
154+
155+
void LongitudinalFeaturesLagger_MP::transform_thread_sparse(
156+
std::vector<SSparseArrayDouble2dPtr> splited_features,
157+
std::vector<SSparseArrayDouble2dPtr> &output, std::mutex &thread_mutex,
158+
std::vector<ulong> splited_censoring) {
159+
for (ulong i = 0; i < splited_features.size(); i++) {
160+
SSparseArrayDouble2dPtr transformed = sparse_lagger(splited_features[i], splited_censoring[i]);
161+
thread_mutex.lock(); // just in case, needed ?
162+
output.push_back(transformed);
163+
thread_mutex.unlock();
164+
}
165+
}
166+
167+
std::vector<ArrayDouble2d> LongitudinalFeaturesLagger_MP::transform(
168+
std::vector<ArrayDouble2d> features, std::vector<ulong> censoring) {
169+
if (features.empty()) TICK_ERROR("features is empty");
170+
171+
if (censoring.empty()) {
172+
for (ulong i = 0; i < features.size(); i++) censoring.push_back(n_intervals);
173+
}
174+
175+
if (features.size() != censoring.size())
176+
TICK_ERROR("features size and censoring size doesn\'t match");
177+
178+
std::pair<ulong, ulong> base_shape = {features[0].n_rows(), features[0].n_cols()};
179+
for (ArrayDouble2d f : features)
180+
if (f.n_rows() != base_shape.first || f.n_cols() != base_shape.second)
181+
TICK_ERROR("All the elements of features should have the same shape");
182+
183+
size_t thread_count = std::min((size_t)features.size(), n_jobs);
184+
std::vector<std::vector<ArrayDouble2d>> splited_features = split_vector(features, thread_count);
185+
features.clear();
186+
std::vector<std::vector<ulong>> splited_censoring = split_vector(censoring, thread_count);
187+
censoring.clear();
188+
189+
if (splited_features.size() != splited_censoring.size())
190+
TICK_ERROR("Unexepected error : splited_features.size() != splited_censoring.size()");
191+
if (splited_features.size() != thread_count || splited_censoring.size() != thread_count)
192+
TICK_ERROR(
193+
"Unexepected error : splited_features.size() != thread_count || splited_censoring.size() "
194+
"!= thread_count");
195+
if (splited_features.empty() || splited_censoring.empty())
196+
TICK_ERROR("Unexepected error : splited_features.empty() || splited_censoring.empty()");
197+
198+
std::vector<ArrayDouble2d> output;
199+
std::vector<std::thread> threads;
200+
std::mutex thread_mutex;
201+
202+
for (size_t i = 0; i < thread_count; i++)
203+
threads.push_back(std::thread(&LongitudinalFeaturesLagger_MP::transform_thread_dense, this,
204+
splited_features[i], std::ref(output), std::ref(thread_mutex),
205+
splited_censoring[i]));
206+
207+
splited_features.clear();
208+
splited_censoring.clear();
209+
210+
for (size_t i = 0; i < threads.size(); i++) threads[i].join();
211+
212+
return output;
213+
}
214+
215+
std::vector<SSparseArrayDouble2dPtr> LongitudinalFeaturesLagger_MP::transform(
216+
std::vector<SSparseArrayDouble2dPtr> features, std::vector<ulong> censoring) {
217+
if (features.empty()) TICK_ERROR("features is empty");
218+
219+
if (censoring.empty()) {
220+
for (ulong i = 0; i < features.size(); i++) censoring.push_back(n_intervals);
221+
}
222+
223+
if (features.size() != censoring.size())
224+
TICK_ERROR("features size and censoring size doesn\'t match");
225+
226+
std::pair<ulong, ulong> base_shape = {features[0]->n_rows(), features[0]->n_cols()};
227+
n_intervals = base_shape.first;
228+
for (SSparseArrayDouble2dPtr f : features)
229+
if (f->n_rows() != base_shape.first || f->n_cols() != base_shape.second)
230+
TICK_ERROR("All the elements of features should have the same shape");
231+
232+
size_t thread_count = std::min((size_t)features.size(), n_jobs);
233+
std::vector<std::vector<SSparseArrayDouble2dPtr>> splited_features =
234+
split_vector(features, thread_count);
235+
features.clear();
236+
std::vector<std::vector<ulong>> splited_censoring = split_vector(censoring, thread_count);
237+
censoring.clear();
238+
239+
if (splited_features.size() != splited_censoring.size())
240+
TICK_ERROR("Unexepected error : splited_features.size() != splited_censoring.size()");
241+
if (splited_features.empty() || splited_censoring.empty())
242+
TICK_ERROR("Unexepected error : splited_features.empty() || splited_censoring.empty()");
243+
244+
std::vector<SSparseArrayDouble2dPtr> output;
245+
std::vector<std::thread> threads;
246+
std::mutex thread_mutex;
247+
248+
for (size_t i = 0; i < thread_count; i++)
249+
threads.push_back(std::thread(&LongitudinalFeaturesLagger_MP::transform_thread_sparse, this,
250+
splited_features[i], std::ref(output), std::ref(thread_mutex),
251+
splited_censoring[i]));
252+
253+
splited_features.clear();
254+
splited_censoring.clear();
255+
256+
for (size_t i = 0; i < threads.size(); i++) threads[i].join();
257+
258+
return output;
259+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#include "tick/preprocessing/longitudinal_preprocessor.h"
2+
3+
template<typename T>
4+
std::vector<std::vector<T>> LongitudinalPreprocessor::split_vector(std::vector<T> array, size_t chunks) {
5+
if (chunks == 0)
6+
TICK_ERROR("Chunks size cannot be zero");
7+
8+
ulong size = array.size();
9+
10+
ulong closest_multi = size;
11+
for (; closest_multi%chunks != 0; closest_multi++) {}
12+
13+
ulong chunk_size = ceil(closest_multi / chunks);
14+
15+
std::vector<std::vector<T>> out;
16+
for (ulong i = 0; i < size; i++) {
17+
ulong new_size = std::min(size - chunk_size * i, chunk_size);
18+
if (new_size == 0)
19+
break;
20+
std::vector<T> tmp(new_size);
21+
for (ulong j = 0; j < new_size; j++) {
22+
tmp[j] = array[i*chunk_size+j];
23+
}
24+
out.push_back(tmp);
25+
}
26+
return out;
27+
}
28+
29+
template DLL_PUBLIC
30+
std::vector<std::vector<SSparseArrayDouble2dPtr>>
31+
LongitudinalPreprocessor::split_vector(std::vector<SSparseArrayDouble2dPtr> array, size_t chunks);
32+
33+
template DLL_PUBLIC
34+
std::vector<std::vector<ArrayDouble2d>>
35+
LongitudinalPreprocessor::split_vector(std::vector<ArrayDouble2d> array, size_t chunks);
36+
37+
template DLL_PUBLIC
38+
std::vector<std::vector<ulong>>
39+
LongitudinalPreprocessor::split_vector(std::vector<ulong> array, size_t chunks);
40+

0 commit comments

Comments
 (0)