Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions ITMO_FS/embedded/WeightedEvReg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import math
import numpy as np
import random
from collections import defaultdict

from ITMO_FS.utils import apply_cr

from ..utils import BaseTransformer


class WeightedEvReg(BaseTransformer):
"""
Builds weighted evidential regression model, which learns features weights during fitting.
Thus learnt feature wieghts can be used as ranks in feature selection.

Parameters
----------
X : numpy array, shape (n_samples, n_features)
The input samples.
y : numpy array, shape (n_samples, )
The classes for the samples.
alpha : np.float64
Learning rate (Optional 0.01 by default)
num_epochs : int
Number of epochs of gradient descent (Optional 1000 by default)
p : int
Power of minkoswki distance (Optional 2 by default)
k : int
Number of neighbors for knn-approach optimization (Optional 0.1 from X.shape[0] by default)
radius : np.float64
Radius of the RBF distance

Returns
-------
Score for each feature as a numpy array, shape (n_features, )

See Also
--------
https://www.researchgate.net/publication/343493691_Feature_Selection_for_Health_Care_Costs_Prediction_Using_Weighted_Evidential_Regression

Note:
The main idea is to use the weighted EVREG for predicting labels and then optimize the weights according to loss via
gradient descent for fixed number of epochs. The weights are used in counting distance between objects, thus
weighting features impact in distance values. While optimizing features impact in distance algorithm optimizes
quality of prediction thus finding the bond between feature and prediction and performing feature selection

Examples
--------
>>> import sklearn.datasets as datasets
>>> from ITMO_FS.embedded import WeightedEvReg
>>> X = np.array([[1, 2, 3, 3, 1],[2, 2, 3, 3, 2], [1, 3, 3, 1, 3],[3, 1, 3, 1, 4],[4, 4, 3, 1, 5]], dtype = np.integer)
>>> y = np.array([1, 2, 3, 4, 5], dtype=np.integer)
>>> weighted_ev_reg = WeightedEvReg(cutting_rule=('K best', 2), num_epochs=100)
>>> weighted_ev_reg.fit(X, y)
>>> print(weighted_ev_reg.selected_features_)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not print this
and check this test with pytest

"""

def __init__(self, cutting_rule, alpha=0.01, num_epochs=1000, p=2, k=None, radius=5.0):
self.alpha = alpha
self.num_epochs = num_epochs
self.p = p
self.k = k
self.radius = radius
self.cutting_rule = cutting_rule
random.seed(42)

@staticmethod
def __weighted_minkowski_distance(first, second, weights, p):
return sum(abs((first - second) * weights) ** p) ** (1.0 / p)

@staticmethod
def __rbf(distance, radius):
return math.exp(-(distance ** 2) / radius)

def __rbf_vectors(self, first, second, weights, p, radius):
return math.exp(-(self.__weighted_minkowski_distance(first, second, weights, p) ** 2) / radius)

def __count_K(self, X, index, nearest_neighbors, weights, p, radius):
all_distances = [self.__rbf(self.__weighted_minkowski_distance(X[index], X[t], weights, p), radius) for t in
nearest_neighbors]
distances_minus = np.prod([1 - dist for dist in all_distances])
distances_without = [dist * distances_minus / (1 - dist) for dist in all_distances]
return distances_minus + sum(distances_without), distances_without, distances_minus

@staticmethod
def __elements_number(k_smallest):
sum(map(lambda t: len(t), k_smallest.values()))

def __evreg_predict(self, X, y, index, cur_weights, p, k, radius):
to_predict = X[index]
k_smallest = defaultdict(list)
for i in range(X.shape[0]):
if i == index:
continue
cur_distance = self.__weighted_minkowski_distance(to_predict, X[i], cur_weights, p)
if self.__elements_number(k_smallest) == k:
max_smallest = max(k_smallest.keys())
if cur_distance < max_smallest:
del k_smallest[max_smallest][random.randint(0, len(k_smallest[max_smallest]) - 1)]
k_smallest[cur_distance] = i
else:
k_smallest[cur_distance].append(i)
nearest_neighbors = list([item for sublist in k_smallest.values() for item in sublist])
K, distances_without, m_star = self.__count_K(X, index, nearest_neighbors, cur_weights, p, radius)
m = 1.0 / K * np.array(distances_without)
return sum(m[i] * y[nearest_neighbors[i]] for i in range(k)) + m_star * (
max(y[nearest_neighbors]) + min(y[nearest_neighbors])) / 2

@staticmethod
def __count_loss(expected_y, predicted_y):
return 1.0 / len(expected_y) * sum((expected_y - predicted_y) ** 2)

@staticmethod
def __minkowski_derivative(first, second, weights, p):
return sum(abs((first - second) * weights) ** p) ** (1.0 / p - 1) * p / (p - 1) * ((first - second) ** (p - 1))

def __rbf_derivative(self, first, second, weights, p, radius):
distance = self.__weighted_minkowski_distance(first, second, weights, p)
return -2.0 / radius * self.__rbf(distance, radius) * distance * self.__minkowski_derivative(first, second,
weights, p)

def __prod_seq_func(self, X, index, skip, weights, p, radius, also_skip=None):
return np.prod(
[1 - self.__rbf_vectors(X[index], X[i], weights, p, radius) for i in range(X.shape[0]) if
i not in skip and i != also_skip])

def __product_sequence_derivative(self, X, index, skip, weights, p, radius):
return np.sum(
[self.__rbf_derivative(index, i, weights, p, radius) * self.__prod_seq_func(X, index, skip, weights, p,
radius, i) for
i
in range(X.shape[0]) if
i not in skip],
axis=0)

def __K_derivative(self, X, index, weights, p, radius):
sum_func = lambda skip: self.__rbf_derivative(X[index], X[skip], weights, p, radius) * \
self.__prod_seq_func(X, index, [skip, index], weights, p, radius) + \
self.__rbf_vectors(X[index], X[skip], weights, p, radius) * \
self.__product_sequence_derivative(X, index, [index, skip], weights, p, radius)

return self.__product_sequence_derivative(X, index, [index], weights, p, radius) + np.sum(
[sum_func(i) for i in range(X.shape[0]) if i != index], axis=0)

def __count_K_all(self, X, index, weights, p, radius):
all_distances = [self.__rbf(self.__weighted_minkowski_distance(X[index], X[t], weights, p), radius) for t in
range(X.shape[0]) if t != index]
distances_minus = np.prod([1 - dist for dist in all_distances])
distances_without = [dist * distances_minus / (1 - dist) for dist in all_distances]
return distances_minus + sum(distances_without), distances_without, distances_minus

def __single_mass_derivative(self, X, i, j, weights, p, radius):
K, _, distances_minus = self.__count_K_all(X, i, weights, p, radius)
return (K * self.__rbf_derivative(X[i], X[j], weights, p, radius) - self.__K_derivative(X, i, weights, p,
radius) *
self.__rbf_vectors(X[i], X[j], weights, p, radius)) * distances_minus / (K ** 2) + \
self.__rbf_vectors(X[i], X[j], weights, p, radius) / \
K * self.__product_sequence_derivative(X, i, [i], weights, p, radius)

def __mass_star_derivative(self, X, i, weights, p, radius):
K, _, distances_minus = self.__count_K_all(X, i, weights, p, radius)
return (K * self.__product_sequence_derivative(X, i, [i], weights, p, radius) -
self.__K_derivative(X, i, weights, p, radius) * distances_minus) / (K ** 2)

def __y_derivative(self, X, i, weights, p, radius, y):
y_der = [0 for i in range(len(weights))]
y_lab = [y[j] for j in range(len(y)) if j != i]
for j in range(X.shape[0]):
if j == i:
continue
y_der += self.__single_mass_derivative(X, i, j, weights, p, radius) * y[j]
y_der += self.__mass_star_derivative(X, i, weights, p, radius) * (max(y_lab) + min(y_lab)) / 2
return y_der

def __update_weights(self, X, y, alpha, weights, p, radius):
return weights + alpha * 2.0 / X.shape[0] * -1 * \
np.sum([self.__y_derivative(X, i, weights, p, radius, y) for i in range(X.shape[0])], axis=0)

def _fit(self, X, y):
"""
Runs the Weighted evidential regression algorithm on the specified dataset.

Parameters
----------
X : array-like, shape (n_samples, n_features)
The input samples.
y : array-like, shape (n_samples)
The classes for the samples.

Returns
------
None
"""
if self.k is None:
self.k = int(0.1 * X.shape[0])
if self.k < 1:
self.k = X.shape[0] - 1
print(self.k)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should go to log or get deleted

feature_size = X.shape[1]
best_weights = np.ones(feature_size, dtype=np.float64)
min_loss = float('inf')
cur_weights = best_weights.copy()
for _ in range(self.num_epochs):
predicted_y = []
for i in range(X.shape[0]):
predicted_y.append(self.__evreg_predict(X, y, i, cur_weights, self.p, self.k, self.radius))
cur_loss = self.__count_loss(y, predicted_y)
cur_weights = self.__update_weights(X, y, self.alpha, cur_weights, self.p, self.radius)
if cur_loss < min_loss:
best_weights = cur_weights
cutting_rule = apply_cr(self.cutting_rule)

self.selected_features_ = cutting_rule(dict(zip(range(1, len(best_weights)), best_weights)))
1 change: 1 addition & 0 deletions ITMO_FS/embedded/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .MOS import MOS
from .WeightedEvReg import WeightedEvReg
3 changes: 0 additions & 3 deletions ITMO_FS/ensembles/model_based/best_sum.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import numpy as np
from sklearn.utils import check_array
from ...utils import BaseTransformer, generate_features, apply_cr
from ...filters.univariate.measures import GLOB_CR, GLOB_MEASURE
from sklearn.utils.validation import check_is_fitted


class BestSum(BaseTransformer): ## TODO refactor , not stable
Expand Down
4 changes: 2 additions & 2 deletions ITMO_FS/filters/univariate/UnivariateFilter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from numpy import ndarray

from .measures import GLOB_CR, GLOB_MEASURE
from .measures import GLOB_MEASURE
from ...utils import BaseTransformer, generate_features, check_restrictions, apply_cr


Expand Down Expand Up @@ -30,7 +30,7 @@ class UnivariateFilter(BaseTransformer): # TODO ADD LOGGING
--------

>>> from sklearn.datasets import make_classification
>>> from ITMO_FS.filters.univariate import select_k_best
>>> from ITMO_FS.utils import select_k_best
>>> from ITMO_FS.filters.univariate import UnivariateFilter
>>> from ITMO_FS.filters.univariate import f_ratio_measure
>>> x, y = make_classification(1000, 100, n_informative = 10, n_redundant = 30, \
Expand Down
3 changes: 1 addition & 2 deletions ITMO_FS/filters/univariate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from .VDM import VDM
from .measures import anova, fit_criterion_measure, f_ratio_measure, gini_index, su_measure, modified_t_score, fechner_corr, \
information_gain, reliefF_measure, chi2_measure, spearman_corr, pearson_corr, laplacian_score, qpfs_filter, \
kendall_corr, select_k_best, select_k_worst, select_worst_by_value, select_best_by_value, select_best_percentage,\
select_worst_percentage
kendall_corr
from .NDFS import NDFS
from .RFS import RFS
from .SPEC import SPEC
Expand Down
73 changes: 1 addition & 72 deletions ITMO_FS/filters/univariate/measures.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ def reliefF_measure(X, y, k_neighbors=1):
with np.errstate(divide='ignore', invalid="ignore"): # todo
return f_ratios / (np.amax(X, axis=0) - np.amin(X, axis=0))


def relief_measure(X, y, m=None):
"""
Computes Relief measure for each feature.
Expand Down Expand Up @@ -947,78 +948,6 @@ def modified_t_score(X, y):
"Relief": relief_measure}


def select_best_by_value(value):
return _wrapped_partial(__select_by_value, value=value, more=True)


def select_worst_by_value(value):
return _wrapped_partial(__select_by_value, value=value, more=False)


def __select_by_value(scores, value, more=True):
features = []
for key, sc_value in scores.items():
if more:
if sc_value >= value:
features.append(key)
else:
if sc_value <= value:
features.append(key)
return features


def select_k_best(k):
return _wrapped_partial(__select_k, k=k, reverse=True)


def select_k_worst(k):
return _wrapped_partial(__select_k, k=k)


def __select_k(scores, k, reverse=False):
if type(k) != int:
raise TypeError("Number of features should be integer")
if k > len(scores):
raise ValueError("Cannot select %d features with n_features = %d" % (k, len(scores)))
return [keys[0] for keys in sorted(scores.items(), key=lambda kv: kv[1], reverse=reverse)[:k]]


def __select_percentage_best(scores, percent):
features = []
max_val = max(scores.values())
threshold = max_val * percent
for key, sc_value in scores.items():
if sc_value >= threshold:
features.append(key)
return features


def select_best_percentage(percent):
return _wrapped_partial(__select_percentage_best, percent=percent)


def __select_percentage_worst(scores, percent):
features = []
max_val = min(scores.values())
threshold = max_val * percent
for key, sc_value in scores.items():
if sc_value >= threshold:
features.append(key)
return features


def select_worst_percentage(percent):
return _wrapped_partial(__select_percentage_worst, percent=percent)


GLOB_CR = {"Best by value": select_best_by_value,
"Worst by value": select_worst_by_value,
"K best": select_k_best,
"K worst": select_k_worst,
"Worst by percentage": select_worst_percentage,
"Best by percentage": select_best_percentage}


def qpfs_filter(X, y, r=None, sigma=None, solv='quadprog', fn=pearson_corr):
"""
Performs Quadratic Programming Feature Selection algorithm.
Expand Down
2 changes: 2 additions & 0 deletions ITMO_FS/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
from .qpfs_body import qpfs_body
from .base_transformer import BaseTransformer
from .base_wrapper import BaseWrapper
from .cutting_rules import select_k_best, select_k_worst, select_worst_by_value, select_best_by_value, select_best_percentage,\
select_worst_percentage
Loading