Skip to content

Commit 2a5945a

Browse files
PhilipDeeganMaryanMorel
authored andcommitted
Parallelize longitudinal preprocessors
1 parent 3e7d36e commit 2a5945a

10 files changed

+256
-175
lines changed

lib/cpp/preprocessing/longitudinal_features_lagger.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99

1010
LongitudinalFeaturesLagger::LongitudinalFeaturesLagger(
1111
ulong n_intervals,
12-
SArrayULongPtr n_lags)
12+
SArrayULongPtr _n_lags)
1313
: n_intervals(n_intervals),
14-
n_lags(n_lags),
15-
n_features(n_lags->size()),
16-
n_lagged_features(n_features + n_lags->sum()),
17-
col_offset(nullptr){
14+
n_lags(_n_lags),
15+
n_features(_n_lags->size()),
16+
n_lagged_features(_n_lags->size() + _n_lags->sum()){
1817
if (n_lags != nullptr) compute_col_offset(n_lags);
1918
};
2019

@@ -33,8 +32,8 @@ void LongitudinalFeaturesLagger::compute_col_offset(const SArrayULongPtr n_lags)
3332
void LongitudinalFeaturesLagger::dense_lag_preprocessor(ArrayDouble2d &features,
3433
ArrayDouble2d &out,
3534
ulong censoring) const {
36-
if (n_features != features.n_rows()) {
37-
TICK_ERROR("Features matrix rows count should match n_lags length.");
35+
if (n_intervals != features.n_rows()) {
36+
TICK_ERROR("Features matrix rows count should match n_intervals.");
3837
}
3938
if (n_features != features.n_cols()) {
4039
TICK_ERROR("Features matrix column count should match n_lags length.");

lib/cpp/preprocessing/sparse_longitudinal_features_product.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,6 @@
77
#include "tick/preprocessing/sparse_longitudinal_features_product.h"
88
#include <map>
99

10-
SparseLongitudinalFeaturesProduct::SparseLongitudinalFeaturesProduct(
11-
const SBaseArrayDouble2dPtrList1D &features)
12-
: n_features(features[0]->n_cols()) {}
13-
1410
ulong SparseLongitudinalFeaturesProduct::get_feature_product_col(
1511
ulong col1, ulong col2, ulong n_cols) const {
1612
if (col1 > col2) { // ensure we have the right order as the following formula

lib/include/tick/preprocessing/longitudinal_features_lagger.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ class LongitudinalFeaturesLagger {
1717
SArrayULongPtr n_lags;
1818
ulong n_features;
1919
ulong n_lagged_features;
20-
SArrayULongPtr col_offset;
20+
SArrayULongPtr col_offset = nullptr;
2121

2222
public:
2323
// This exists soley for cereal/swig
24-
LongitudinalFeaturesLagger(): LongitudinalFeaturesLagger(0, nullptr) {};
24+
LongitudinalFeaturesLagger() = default;
2525

2626
LongitudinalFeaturesLagger(ulong n_intervals,
2727
SArrayULongPtr n_lags);
@@ -45,11 +45,9 @@ class LongitudinalFeaturesLagger {
4545

4646
Array<ulong> temp_n_lags, temp_col_offset;
4747
ar(cereal::make_nvp("n_lags", temp_n_lags));
48-
// ar(cereal::make_nvp("col_offset", temp_col_offset));
4948

5049
n_lags = temp_n_lags.as_sarray_ptr();
51-
if (n_lags != nullptr) compute_col_offset(n_lags);
52-
// col_offset = temp_col_offset.as_sarray_ptr();
50+
col_offset = temp_col_offset.as_sarray_ptr();
5351
}
5452

5553

@@ -59,7 +57,7 @@ class LongitudinalFeaturesLagger {
5957
ar(CEREAL_NVP(n_features));
6058
ar(CEREAL_NVP(n_lagged_features));
6159
ar(cereal::make_nvp("n_lags", *n_lags));
62-
// ar(cereal::make_nvp("col_offset", *col_offset));
60+
ar(cereal::make_nvp("col_offset", *col_offset));
6361
}
6462
};
6563

lib/include/tick/preprocessing/sparse_longitudinal_features_product.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ class SparseLongitudinalFeaturesProduct {
1616
ulong n_features;
1717

1818
public:
19+
// This exists soley for cereal/swig
20+
explicit SparseLongitudinalFeaturesProduct() = default;
21+
1922
explicit SparseLongitudinalFeaturesProduct(
20-
const SBaseArrayDouble2dPtrList1D &features);
23+
const ulong n_features): n_features(n_features) {};
2124

2225
inline ulong get_feature_product_col(ulong col1, ulong col2,
2326
ulong n_cols) const;
@@ -28,7 +31,12 @@ class SparseLongitudinalFeaturesProduct {
2831
ArrayDouble &out_data) const;
2932

3033
template <class Archive>
31-
void serialize(Archive &ar) {
34+
void load(Archive &ar) {
35+
ar(CEREAL_NVP(n_features));
36+
}
37+
38+
template <class Archive>
39+
void save(Archive &ar) const {
3240
ar(CEREAL_NVP(n_features));
3341
}
3442
};

lib/swig/preprocessing/longitudinal_features_lagger.i

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
class LongitudinalFeaturesLagger {
1010

1111
public:
12-
// This exists soley for cereal/swig
13-
LongitudinalFeaturesLagger(): LongitudinalFeaturesLagger(0, nullptr) {};
12+
// This exists soley for cereal/swig
13+
LongitudinalFeaturesLagger();
1414

1515
LongitudinalFeaturesLagger(ulong n_intervals,
1616
SArrayULongPtr n_lags);

lib/swig/preprocessing/sparse_longitudinal_features_product.i

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
class SparseLongitudinalFeaturesProduct {
1010

1111
public:
12-
SparseLongitudinalFeaturesProduct(const SBaseArrayDouble2dPtrList1D &features);
12+
// This exists soley for cereal/swig
13+
SparseLongitudinalFeaturesProduct();
14+
15+
SparseLongitudinalFeaturesProduct(const ulong n_features);
1316

1417
void sparse_features_product(ArrayULong &row,
1518
ArrayULong &col,

tick/preprocessing/longitudinal_features_lagger.py

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import numpy as np
44
import scipy.sparse as sps
55
from tick.preprocessing.base import LongitudinalPreprocessor
6-
from .build.preprocessing import LongitudinalFeaturesLagger\
6+
from tick.preprocessing.build.preprocessing import LongitudinalFeaturesLagger\
77
as _LongitudinalFeaturesLagger
8-
from .utils import check_longitudinal_features_consistency,\
8+
from tick.preprocessing.utils import check_longitudinal_features_consistency,\
99
check_censoring_consistency
1010
from multiprocessing.pool import Pool
11+
from copy import deepcopy
12+
from functools import partial, partialmethod
1113

1214

1315
class LongitudinalFeaturesLagger(LongitudinalPreprocessor):
@@ -76,15 +78,12 @@ class LongitudinalFeaturesLagger(LongitudinalPreprocessor):
7678
"_n_intervals": {
7779
"writable": False
7880
},
79-
"_cpp_preprocessor": {
80-
"writable": False
81-
},
8281
"_fitted": {
8382
"writable": False
8483
}
8584
}
8685

87-
def __init__(self, n_lags, n_jobs=1):
86+
def __init__(self, n_lags, n_jobs=-1):
8887
LongitudinalPreprocessor.__init__(self, n_jobs=n_jobs)
8988
if not isinstance(n_lags, np.ndarray) or n_lags.dtype != 'uint64':
9089
raise ValueError(
@@ -93,15 +92,13 @@ def __init__(self, n_lags, n_jobs=1):
9392
self._n_init_features = None
9493
self._n_output_features = None
9594
self._n_intervals = None
96-
self._cpp_preprocessor = None
9795
self._fitted = False
9896

9997
def _reset(self):
10098
"""Resets the object its initial construction state."""
10199
self._set("_n_init_features", None)
102100
self._set("_n_output_features", None)
103101
self._set("_n_intervals", None)
104-
self._set("_cpp_preprocessor", None)
105102
self._set("_fitted", False)
106103

107104
def fit(self, features, labels=None, censoring=None):
@@ -138,10 +135,7 @@ def fit(self, features, labels=None, censoring=None):
138135
self._set("_n_init_features", n_init_features)
139136
self._set("_n_intervals", n_intervals)
140137
self._set("_n_output_features", int((self.n_lags + 1).sum()))
141-
self._set("_cpp_preprocessor",
142-
_LongitudinalFeaturesLagger(features, self.n_lags))
143138
self._set("_fitted", True)
144-
145139
return self
146140

147141
def transform(self, features, labels=None, censoring=None):
@@ -175,49 +169,58 @@ def transform(self, features, labels=None, censoring=None):
175169
base_shape = (self._n_intervals, self._n_init_features)
176170
features = check_longitudinal_features_consistency(
177171
features, base_shape, "float64")
178-
if sps.issparse(features[0]):
179-
if self.n_jobs > 1:
180-
with Pool(self.n_jobs) as pool:
181-
X_with_lags = pool.starmap(self._sparse_lagger, zip(features, censoring))
182-
pool.start()
183-
pool.join()
184-
else:
185-
X_with_lags = [
186-
self._sparse_lagger(x, int(censoring[i]))
187-
for i, x in enumerate(features)
188-
]
189-
# TODO: Don't get why int() is required here as censoring_i is uint64
190-
else:
191-
if self.n_jobs > 1:
192-
with Pool(self.n_jobs) as pool:
193-
X_with_lags = pool.starmap(self._dense_lagger, zip(features, censoring))
194-
pool.start()
195-
pool.join()
196-
else:
197-
X_with_lags = [
198-
self._dense_lagger(x, int(censoring[i]))
199-
for i, x in enumerate(features)
200-
]
172+
173+
initializer = partial(self._inject_cpp_object,
174+
n_intervals=self._n_intervals, n_lags=self.n_lags)
175+
callback = self._sparse_lagger if sps.issparse(features[0]) \
176+
else self._dense_lagger
177+
callback = partial(callback, n_intervals=self._n_intervals,
178+
n_output_features=self._n_output_features,
179+
n_lags=self.n_lags)
180+
181+
with Pool(self.n_jobs, initializer=initializer) as pool:
182+
X_with_lags = pool.starmap(callback, zip(features, censoring))
201183

202184
return X_with_lags, labels, censoring
203185

204-
def _dense_lagger(self, feature_matrix, censoring_i):
205-
output = np.zeros((self._n_intervals, self._n_output_features),
186+
@staticmethod
187+
def _inject_cpp_object(n_intervals, n_lags):
188+
"""Creates a global instance of the CPP preprocessor object.
189+
190+
WARNING: to be used only as a multiprocessing.Pool initializer.
191+
In multiprocessing context, each process has its own namespace, so using
192+
global is not as bad as it seems. Still, it requires to proceed with
193+
caution.
194+
"""
195+
global _cpp_preprocessor
196+
_cpp_preprocessor = _LongitudinalFeaturesLagger(n_intervals, n_lags)
197+
198+
@staticmethod
199+
def _dense_lagger(feature_matrix, censoring_i, n_intervals,
200+
n_output_features, n_lags):
201+
"""Creates a lagged version of a dense matrixrepresenting longitudinal
202+
features."""
203+
global _cpp_preprocessor
204+
output = np.zeros((n_intervals, n_output_features),
206205
dtype="float64")
207-
self._cpp_preprocessor.dense_lag_preprocessor(feature_matrix, output,
208-
censoring_i)
206+
_cpp_preprocessor.dense_lag_preprocessor(feature_matrix, output,
207+
int(censoring_i))
209208
return output
210209

211-
def _sparse_lagger(self, feature_matrix, censoring_i):
212-
pp = self._cpp_preprocessor
210+
@staticmethod
211+
def _sparse_lagger(feature_matrix, censoring_i, n_intervals,
212+
n_output_features, n_lags):
213+
"""Creates a lagged version of a sparse matrix representing longitudinal
214+
features."""
215+
global _cpp_preprocessor
213216
coo = feature_matrix.tocoo()
214-
estimated_nnz = coo.nnz * int((self.n_lags + 1).sum())
217+
estimated_nnz = coo.nnz * int((n_lags + 1).sum())
215218
out_row = np.zeros((estimated_nnz,), dtype="uint64")
216219
out_col = np.zeros((estimated_nnz,), dtype="uint64")
217220
out_data = np.zeros((estimated_nnz,), dtype="float64")
218-
pp.sparse_lag_preprocessor(
221+
_cpp_preprocessor.sparse_lag_preprocessor(
219222
coo.row.astype("uint64"), coo.col.astype("uint64"), coo.data,
220223
out_row, out_col, out_data, int(censoring_i))
221224
return sps.csr_matrix((out_data, (out_row, out_col)),
222-
shape=(self._n_intervals,
223-
self._n_output_features))
225+
shape=(n_intervals,
226+
n_output_features))

0 commit comments

Comments
 (0)