|
| 1 | +// License: BSD 3 clause |
| 2 | + |
| 3 | +// |
| 4 | +// Created by Maryan Morel on 15/05/2017. |
| 5 | +// |
| 6 | + |
| 7 | +#include <mutex> |
| 8 | +#include "tick/preprocessing/longitudinal_features_lagger_mp.h" |
| 9 | + |
| 10 | +LongitudinalFeaturesLagger_MP::LongitudinalFeaturesLagger_MP(ulong n_intervals, |
| 11 | + SArrayULongPtr _n_lags, size_t n_jobs) |
| 12 | + : LongitudinalPreprocessor(n_jobs), |
| 13 | + n_intervals(n_intervals), |
| 14 | + n_lags(_n_lags), |
| 15 | + n_features(_n_lags->size()), |
| 16 | + n_lagged_features(_n_lags->size() + _n_lags->sum()) { |
| 17 | + if (n_lags != nullptr) compute_col_offset(n_lags); |
| 18 | + n_output_features = get_n_output_features(); |
| 19 | +} |
| 20 | + |
| 21 | +void LongitudinalFeaturesLagger_MP::compute_col_offset(const SArrayULongPtr n_lags) { |
| 22 | + ArrayULong col_offset_temp = ArrayULong(n_lags->size()); |
| 23 | + col_offset_temp.init_to_zero(); |
| 24 | + for (ulong i(1); i < n_lags->size(); i++) { |
| 25 | + if ((*n_lags)[i] > n_intervals) { // (*n_lags)[i] >= n_intervals |
| 26 | + TICK_ERROR("n_lags elements must be between 0 and n_intervals."); // (n_intervals - 1) was |
| 27 | + // actually wrong? |
| 28 | + } |
| 29 | + col_offset_temp[i] = col_offset_temp[i - 1] + (*n_lags)[i - 1] + 1; |
| 30 | + } |
| 31 | + col_offset = col_offset_temp.as_sarray_ptr(); |
| 32 | +} |
| 33 | + |
| 34 | +void LongitudinalFeaturesLagger_MP::dense_lag_preprocessor(ArrayDouble2d &features, |
| 35 | + ArrayDouble2d &out, |
| 36 | + ulong censoring) const { |
| 37 | + if (n_intervals != features.n_rows()) { |
| 38 | + TICK_ERROR("Features matrix rows count should match n_intervals."); |
| 39 | + } |
| 40 | + if (n_features != features.n_cols()) { |
| 41 | + TICK_ERROR("Features matrix column count should match n_lags length."); |
| 42 | + } |
| 43 | + if (out.n_cols() != n_lagged_features) { |
| 44 | + TICK_ERROR("n_columns of &out should be equal to n_features + sum(n_lags)."); |
| 45 | + } |
| 46 | + if (out.n_rows() != n_intervals) { |
| 47 | + TICK_ERROR("n_rows of &out is inconsistent with n_intervals"); |
| 48 | + } |
| 49 | + ulong n_cols_feature, row, col, max_col; |
| 50 | + double value; |
| 51 | + for (ulong feature = 0; feature < n_features; feature++) { |
| 52 | + n_cols_feature = (*n_lags)[feature] + 1; |
| 53 | + for (ulong j = 0; j < n_intervals; j++) { |
| 54 | + row = j; |
| 55 | + col = (*col_offset)[feature]; |
| 56 | + // use view_row instead of (row, feature) to be const |
| 57 | + value = view_row(features, row)[feature]; |
| 58 | + max_col = col + n_cols_feature; |
| 59 | + if (value != 0) { |
| 60 | + while (row < censoring && col < max_col) { |
| 61 | + out[row * n_lagged_features + col] = value; |
| 62 | + row++; |
| 63 | + col++; |
| 64 | + } |
| 65 | + } |
| 66 | + } |
| 67 | + } |
| 68 | +} |
| 69 | + |
| 70 | +void LongitudinalFeaturesLagger_MP::sparse_lag_preprocessor(ArrayULong &row, ArrayULong &col, |
| 71 | + ArrayDouble &data, ArrayULong &out_row, |
| 72 | + ArrayULong &out_col, |
| 73 | + ArrayDouble &out_data, |
| 74 | + ulong censoring) const { |
| 75 | + // TODO: add checks here ? Or do them in Python ? |
| 76 | + if (row.size() != col.size() || col.size() != data.size() || data.size() != row.size()) |
| 77 | + TICK_ERROR("row, col and data arrays should have the same size (coo matrix)"); |
| 78 | + if (out_row.size() != out_col.size() || out_col.size() != out_data.size() || |
| 79 | + out_data.size() != out_row.size()) |
| 80 | + TICK_ERROR("out_row, out_col and out_data arrays should have the same size (coo matrix)"); |
| 81 | + |
| 82 | + ulong j(0), r, c, offset, new_col, max_col; |
| 83 | + double value; |
| 84 | + |
| 85 | + for (ulong i = 0; i < data.size(); i++) { |
| 86 | + value = data[i]; |
| 87 | + r = row[i]; |
| 88 | + c = col[i]; |
| 89 | + offset = (*col_offset)[c]; |
| 90 | + max_col = offset + (*n_lags)[c] + 1; |
| 91 | + new_col = offset; |
| 92 | + |
| 93 | + while (r < censoring && new_col < max_col) { |
| 94 | + out_row[j] = r; |
| 95 | + out_col[j] = new_col; |
| 96 | + out_data[j] = value; |
| 97 | + r++; |
| 98 | + new_col++; |
| 99 | + j++; |
| 100 | + } |
| 101 | + } |
| 102 | +} |
| 103 | + |
| 104 | +ulong LongitudinalFeaturesLagger_MP::get_n_output_features() { |
| 105 | + ulong arraysum = 0; |
| 106 | + std::vector<ulong> arraysumint; |
| 107 | + for (size_t i = 0; i < n_lags->size(); i++) arraysumint.push_back((*n_lags)[i] + 1); |
| 108 | + for (ulong i : arraysumint) arraysum += i; |
| 109 | + return arraysum; |
| 110 | +} |
| 111 | + |
| 112 | +SSparseArrayDouble2dPtr LongitudinalFeaturesLagger_MP::sparse_lagger( |
| 113 | + SSparseArrayDouble2dPtr &feature_matrix, ulong censoring_i) { |
| 114 | + if (censoring_i > n_intervals || censoring_i < 1) |
| 115 | + TICK_ERROR("censoring shoud be an integer in [1, n_intervals]"); |
| 116 | + |
| 117 | + CooMatrix<double> coo(feature_matrix); |
| 118 | + |
| 119 | + // TODO FIX this is wrong, but the coo.toSparse removes all the useless zero data. |
| 120 | + ulong estimated_nnz = coo.nnz * n_output_features; |
| 121 | + |
| 122 | + // std::cout << "estimated nnz : " << estimated_nnz << " nnz : " << coo.nnz |
| 123 | + // << " arraysum : " << n_output_features << std::endl; |
| 124 | + |
| 125 | + ArrayULong out_row(estimated_nnz); |
| 126 | + ArrayULong out_col(estimated_nnz); |
| 127 | + ArrayDouble out_data(estimated_nnz); |
| 128 | + |
| 129 | + out_row.init_to_zero(); |
| 130 | + out_col.init_to_zero(); |
| 131 | + out_data.init_to_zero(); |
| 132 | + |
| 133 | + sparse_lag_preprocessor(coo.rows, coo.cols, coo.data, out_row, out_col, out_data, censoring_i); |
| 134 | + |
| 135 | + coo.rows = out_row; |
| 136 | + coo.cols = out_col; |
| 137 | + coo.data = out_data; |
| 138 | + |
| 139 | + return coo.toSparse(n_intervals, n_output_features); |
| 140 | +} |
| 141 | + |
| 142 | +void LongitudinalFeaturesLagger_MP::transform_thread_dense( |
| 143 | + std::vector<ArrayDouble2d> splited_features, std::vector<ArrayDouble2d> &output, |
| 144 | + std::mutex &thread_mutex, std::vector<ulong> splited_censoring) { |
| 145 | + for (ulong i = 0; i < splited_features.size(); i++) { |
| 146 | + ArrayDouble2d transformed(splited_features[i].n_rows(), n_output_features); |
| 147 | + transformed.init_to_zero(); |
| 148 | + dense_lag_preprocessor(splited_features[i], transformed, splited_censoring[i]); |
| 149 | + thread_mutex.lock(); // just in case, needed ? |
| 150 | + output.push_back(transformed); |
| 151 | + thread_mutex.unlock(); |
| 152 | + } |
| 153 | +} |
| 154 | + |
| 155 | +void LongitudinalFeaturesLagger_MP::transform_thread_sparse( |
| 156 | + std::vector<SSparseArrayDouble2dPtr> splited_features, |
| 157 | + std::vector<SSparseArrayDouble2dPtr> &output, std::mutex &thread_mutex, |
| 158 | + std::vector<ulong> splited_censoring) { |
| 159 | + for (ulong i = 0; i < splited_features.size(); i++) { |
| 160 | + SSparseArrayDouble2dPtr transformed = sparse_lagger(splited_features[i], splited_censoring[i]); |
| 161 | + thread_mutex.lock(); // just in case, needed ? |
| 162 | + output.push_back(transformed); |
| 163 | + thread_mutex.unlock(); |
| 164 | + } |
| 165 | +} |
| 166 | + |
| 167 | +std::vector<ArrayDouble2d> LongitudinalFeaturesLagger_MP::transform( |
| 168 | + std::vector<ArrayDouble2d> features, std::vector<ulong> censoring) { |
| 169 | + if (features.empty()) TICK_ERROR("features is empty"); |
| 170 | + |
| 171 | + if (censoring.empty()) { |
| 172 | + for (ulong i = 0; i < features.size(); i++) censoring.push_back(n_intervals); |
| 173 | + } |
| 174 | + |
| 175 | + if (features.size() != censoring.size()) |
| 176 | + TICK_ERROR("features size and censoring size doesn\'t match"); |
| 177 | + |
| 178 | + std::pair<ulong, ulong> base_shape = {features[0].n_rows(), features[0].n_cols()}; |
| 179 | + for (ArrayDouble2d f : features) |
| 180 | + if (f.n_rows() != base_shape.first || f.n_cols() != base_shape.second) |
| 181 | + TICK_ERROR("All the elements of features should have the same shape"); |
| 182 | + |
| 183 | + size_t thread_count = std::min((size_t)features.size(), n_jobs); |
| 184 | + std::vector<std::vector<ArrayDouble2d>> splited_features = split_vector(features, thread_count); |
| 185 | + features.clear(); |
| 186 | + std::vector<std::vector<ulong>> splited_censoring = split_vector(censoring, thread_count); |
| 187 | + censoring.clear(); |
| 188 | + |
| 189 | + if (splited_features.size() != splited_censoring.size()) |
| 190 | + TICK_ERROR("Unexepected error : splited_features.size() != splited_censoring.size()"); |
| 191 | + if (splited_features.size() != thread_count || splited_censoring.size() != thread_count) |
| 192 | + TICK_ERROR( |
| 193 | + "Unexepected error : splited_features.size() != thread_count || splited_censoring.size() " |
| 194 | + "!= thread_count"); |
| 195 | + if (splited_features.empty() || splited_censoring.empty()) |
| 196 | + TICK_ERROR("Unexepected error : splited_features.empty() || splited_censoring.empty()"); |
| 197 | + |
| 198 | + std::vector<ArrayDouble2d> output; |
| 199 | + std::vector<std::thread> threads; |
| 200 | + std::mutex thread_mutex; |
| 201 | + |
| 202 | + for (size_t i = 0; i < thread_count; i++) |
| 203 | + threads.push_back(std::thread(&LongitudinalFeaturesLagger_MP::transform_thread_dense, this, |
| 204 | + splited_features[i], std::ref(output), std::ref(thread_mutex), |
| 205 | + splited_censoring[i])); |
| 206 | + |
| 207 | + splited_features.clear(); |
| 208 | + splited_censoring.clear(); |
| 209 | + |
| 210 | + for (size_t i = 0; i < threads.size(); i++) threads[i].join(); |
| 211 | + |
| 212 | + return output; |
| 213 | +} |
| 214 | + |
| 215 | +std::vector<SSparseArrayDouble2dPtr> LongitudinalFeaturesLagger_MP::transform( |
| 216 | + std::vector<SSparseArrayDouble2dPtr> features, std::vector<ulong> censoring) { |
| 217 | + if (features.empty()) TICK_ERROR("features is empty"); |
| 218 | + |
| 219 | + if (censoring.empty()) { |
| 220 | + for (ulong i = 0; i < features.size(); i++) censoring.push_back(n_intervals); |
| 221 | + } |
| 222 | + |
| 223 | + if (features.size() != censoring.size()) |
| 224 | + TICK_ERROR("features size and censoring size doesn\'t match"); |
| 225 | + |
| 226 | + std::pair<ulong, ulong> base_shape = {features[0]->n_rows(), features[0]->n_cols()}; |
| 227 | + n_intervals = base_shape.first; |
| 228 | + for (SSparseArrayDouble2dPtr f : features) |
| 229 | + if (f->n_rows() != base_shape.first || f->n_cols() != base_shape.second) |
| 230 | + TICK_ERROR("All the elements of features should have the same shape"); |
| 231 | + |
| 232 | + size_t thread_count = std::min((size_t)features.size(), n_jobs); |
| 233 | + std::vector<std::vector<SSparseArrayDouble2dPtr>> splited_features = |
| 234 | + split_vector(features, thread_count); |
| 235 | + features.clear(); |
| 236 | + std::vector<std::vector<ulong>> splited_censoring = split_vector(censoring, thread_count); |
| 237 | + censoring.clear(); |
| 238 | + |
| 239 | + if (splited_features.size() != splited_censoring.size()) |
| 240 | + TICK_ERROR("Unexepected error : splited_features.size() != splited_censoring.size()"); |
| 241 | + if (splited_features.empty() || splited_censoring.empty()) |
| 242 | + TICK_ERROR("Unexepected error : splited_features.empty() || splited_censoring.empty()"); |
| 243 | + |
| 244 | + std::vector<SSparseArrayDouble2dPtr> output; |
| 245 | + std::vector<std::thread> threads; |
| 246 | + std::mutex thread_mutex; |
| 247 | + |
| 248 | + for (size_t i = 0; i < thread_count; i++) |
| 249 | + threads.push_back(std::thread(&LongitudinalFeaturesLagger_MP::transform_thread_sparse, this, |
| 250 | + splited_features[i], std::ref(output), std::ref(thread_mutex), |
| 251 | + splited_censoring[i])); |
| 252 | + |
| 253 | + splited_features.clear(); |
| 254 | + splited_censoring.clear(); |
| 255 | + |
| 256 | + for (size_t i = 0; i < threads.size(); i++) threads[i].join(); |
| 257 | + |
| 258 | + return output; |
| 259 | +} |
0 commit comments