77 as _LongitudinalFeaturesLagger
88from .utils import check_longitudinal_features_consistency ,\
99 check_censoring_consistency
10+ from multiprocessing .pool import Pool
1011
1112
1213class LongitudinalFeaturesLagger (LongitudinalPreprocessor ):
@@ -83,7 +84,7 @@ class LongitudinalFeaturesLagger(LongitudinalPreprocessor):
8384 }
8485 }
8586
86- def __init__ (self , n_lags , n_jobs = - 1 ):
87+ def __init__ (self , n_lags , n_jobs = 1 ):
8788 LongitudinalPreprocessor .__init__ (self , n_jobs = n_jobs )
8889 if not isinstance (n_lags , np .ndarray ) or n_lags .dtype != 'uint64' :
8990 raise ValueError (
@@ -166,7 +167,6 @@ def transform(self, features, labels=None, censoring=None):
166167 output : `[numpy.ndarrays]` or `[csr_matrices]`, shape=(n_intervals, n_features)
167168 The list of features matrices with added lagged features.
168169 """
169-
170170 n_samples = len (features )
171171 if censoring is None :
172172 censoring = np .full ((n_samples ,), self ._n_intervals ,
@@ -176,16 +176,28 @@ def transform(self, features, labels=None, censoring=None):
176176 features = check_longitudinal_features_consistency (
177177 features , base_shape , "float64" )
178178 if sps .issparse (features [0 ]):
179- X_with_lags = [
180- self ._sparse_lagger (x , int (censoring [i ]))
181- for i , x in enumerate (features )
182- ]
183- # TODO: Don't get why int() is required here as censoring_i is uint64
179+ if self .n_jobs > 1 :
180+ with Pool (self .n_jobs ) as pool :
181+ X_with_lags = pool .starmap (self ._sparse_lagger , zip (features , censoring ))
182+ pool .start ()
183+ pool .join ()
184+ else :
185+ X_with_lags = [
186+ self ._sparse_lagger (x , int (censoring [i ]))
187+ for i , x in enumerate (features )
188+ ]
189+ # TODO: Don't get why int() is required here as censoring_i is uint64
184190 else :
185- X_with_lags = [
186- self ._dense_lagger (x , int (censoring [i ]))
187- for i , x in enumerate (features )
188- ]
191+ if self .n_jobs > 1 :
192+ with Pool (self .n_jobs ) as pool :
193+ X_with_lags = pool .starmap (self ._dense_lagger , zip (features , censoring ))
194+ pool .start ()
195+ pool .join ()
196+ else :
197+ X_with_lags = [
198+ self ._dense_lagger (x , int (censoring [i ]))
199+ for i , x in enumerate (features )
200+ ]
189201
190202 return X_with_lags , labels , censoring
191203
@@ -197,14 +209,15 @@ def _dense_lagger(self, feature_matrix, censoring_i):
197209 return output
198210
199211 def _sparse_lagger (self , feature_matrix , censoring_i ):
212+ pp = self ._cpp_preprocessor
200213 coo = feature_matrix .tocoo ()
201214 estimated_nnz = coo .nnz * int ((self .n_lags + 1 ).sum ())
202215 out_row = np .zeros ((estimated_nnz ,), dtype = "uint64" )
203216 out_col = np .zeros ((estimated_nnz ,), dtype = "uint64" )
204217 out_data = np .zeros ((estimated_nnz ,), dtype = "float64" )
205- self . _cpp_preprocessor .sparse_lag_preprocessor (
218+ pp .sparse_lag_preprocessor (
206219 coo .row .astype ("uint64" ), coo .col .astype ("uint64" ), coo .data ,
207- out_row , out_col , out_data , censoring_i )
220+ out_row , out_col , out_data , int ( censoring_i ) )
208221 return sps .csr_matrix ((out_data , (out_row , out_col )),
209222 shape = (self ._n_intervals ,
210223 self ._n_output_features ))
0 commit comments