Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions dask_glm/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs):
Returns
-------
beta : array-like, shape (n_features,)
n_iter : number of iterations executed
"""
loglike, gradient = family.loglike, family.gradient
n, p = X.shape
Expand All @@ -98,6 +99,7 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs):
recalcRate = 10
backtrackMult = firstBacktrackMult
beta = np.zeros(p)
n_iter = 0

for k in range(max_iter):
# how necessary is this recalculation?
Expand Down Expand Up @@ -125,6 +127,8 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs):
beta = beta - stepSize * grad # tiny bit of repeat work here to avoid communication
Xbeta = Xbeta - stepSize * Xgradient

n_iter += 1

if stepSize == 0:
break

Expand All @@ -136,7 +140,7 @@ def gradient_descent(X, y, max_iter=100, tol=1e-14, family=Logistic, **kwargs):
stepSize *= stepGrowth
backtrackMult = nextBacktrackMult

return beta
return beta, n_iter


@normalize
Expand All @@ -158,13 +162,14 @@ def newton(X, y, max_iter=50, tol=1e-8, family=Logistic, **kwargs):
Returns
-------
beta : array-like, shape (n_features,)
n_iter : number of iterations executed
"""
gradient, hessian = family.gradient, family.hessian
n, p = X.shape
beta = np.zeros(p) # always init to zeros?
Xbeta = dot(X, beta)

iter_count = 0
n_iter = 0
converged = False

while not converged:
Expand All @@ -181,17 +186,16 @@ def newton(X, y, max_iter=50, tol=1e-8, family=Logistic, **kwargs):
step, _, _, _ = np.linalg.lstsq(hess, grad)
beta = (beta_old - step)

iter_count += 1

# should change this criterion
coef_change = np.absolute(beta_old - beta)
converged = (
(not np.any(coef_change > tol)) or (iter_count > max_iter))
(not np.any(coef_change > tol)) or (n_iter >= max_iter))

if not converged:
Xbeta = dot(X, beta) # numpy -> dask converstion of beta
n_iter += 1

return beta
return beta, n_iter


@normalize
Expand All @@ -217,6 +221,7 @@ def admm(X, y, regularizer='l1', lamduh=0.1, rho=1, over_relax=1,
Returns
-------
beta : array-like, shape (n_features,)
n_iter : number of iterations executed
"""
pointwise_loss = family.pointwise_loss
pointwise_gradient = family.pointwise_gradient
Expand Down Expand Up @@ -256,6 +261,8 @@ def wrapped(beta, X, y, z, u, rho):
u = np.array([np.zeros(p) for i in range(nchunks)])
betas = np.array([np.ones(p) for i in range(nchunks)])

n_iter = 0

for k in range(max_iter):

# x-update step
Expand Down Expand Up @@ -283,10 +290,12 @@ def wrapped(beta, X, y, z, u, rho):
eps_dual = np.sqrt(p * nchunks) * abstol + \
reltol * np.linalg.norm(rho * u)

n_iter += 1

if primal_res < eps_pri and dual_res < eps_dual:
break

return z
return z, n_iter


def local_update(X, y, beta, z, u, rho, f, fprime, solver=fmin_l_bfgs_b):
Expand Down Expand Up @@ -326,6 +335,7 @@ def lbfgs(X, y, regularizer=None, lamduh=1.0, max_iter=100, tol=1e-4,
Returns
-------
beta : array-like, shape (n_features,)
n_iter : number of iterations executed
"""
pointwise_loss = family.pointwise_loss
pointwise_gradient = family.pointwise_gradient
Expand All @@ -349,7 +359,7 @@ def compute_loss_grad(beta, X, y):
args=(X, y),
iprint=(verbose > 0) - 1, pgtol=tol, maxiter=max_iter)

return beta
return beta, info['nit']


@normalize
Expand All @@ -375,6 +385,7 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic,
Returns
-------
beta : array-like, shape (n_features,)
n_iter : number of iterations executed
"""
n, p = X.shape
firstBacktrackMult = 0.1
Expand All @@ -387,6 +398,8 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic,
beta = np.zeros(p)
regularizer = Regularizer.get(regularizer)

n_iter = 0

for k in range(max_iter):
# Compute the gradient
if k % recalcRate == 0:
Expand All @@ -400,6 +413,8 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic,

obeta = beta

n_iter += 1

# Compute the step size
lf = func
for ii in range(100):
Expand All @@ -426,9 +441,9 @@ def proximal_grad(X, y, regularizer='l1', lamduh=0.1, family=Logistic,

# L2-regularization returned a dask-array
try:
return beta.compute()
return beta.compute(), n_iter
except AttributeError:
return beta
return beta, n_iter


_solvers = {
Expand Down
2 changes: 1 addition & 1 deletion dask_glm/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, fit_intercept=True, solver='admm', regularizer='l2',

def fit(self, X, y=None):
X_ = self._maybe_add_intercept(X)
self._coef = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs)
self._coef, self.n_iter_ = algorithms._solvers[self.solver](X_, y, **self._fit_kwargs)

if self.fit_intercept:
self.coef_ = self._coef[:-1]
Expand Down
4 changes: 3 additions & 1 deletion dask_glm/tests/test_admm.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ def wrapped(beta, X, y, z, u, rho):
@pytest.mark.parametrize('nchunks', [5, 10])
@pytest.mark.parametrize('p', [1, 5, 10])
def test_admm_with_large_lamduh(N, p, nchunks):
max_iter = 500
X = da.random.random((N, p), chunks=(N // nchunks, p))
beta = np.random.random(p)
y = make_y(X, beta=np.array(beta), chunks=(N // nchunks,))

X, y = persist(X, y)
z = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=500)
z, n_iter = admm(X, y, regularizer=L1(), lamduh=1e5, rho=20, max_iter=max_iter)

assert np.allclose(z, np.zeros(p), atol=1e-4)
assert n_iter > 0 and n_iter <= max_iter
36 changes: 23 additions & 13 deletions dask_glm/tests/test_algos_families.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def make_intercept_data(N, p, seed=20009):
(95, 6, 70605)])
def test_methods(N, p, seed, opt):
X, y = make_intercept_data(N, p, seed=seed)
coefs = opt(X, y)
coefs, _ = opt(X, y)
p = sigmoid(X.dot(coefs).compute())

y_sum = y.compute().sum()
Expand All @@ -57,9 +57,9 @@ def test_methods(N, p, seed, opt):


@pytest.mark.parametrize('func,kwargs', [
(newton, {'tol': 1e-5}),
(lbfgs, {'tol': 1e-8}),
(gradient_descent, {'tol': 1e-7}),
(newton, {'tol': 1e-5, 'max_iter': 50}),
(lbfgs, {'tol': 1e-8, 'max_iter': 100}),
(gradient_descent, {'tol': 1e-7, 'max_iter': 100}),
])
@pytest.mark.parametrize('N', [1000])
@pytest.mark.parametrize('nchunks', [1, 10])
Expand All @@ -72,18 +72,20 @@ def test_basic_unreg_descent(func, kwargs, N, nchunks, family):

X, y = persist(X, y)

result = func(X, y, family=family, **kwargs)
result, n_iter = func(X, y, family=family, **kwargs)
test_vec = np.random.normal(size=2)

opt = family.pointwise_loss(result, X, y).compute()
test_val = family.pointwise_loss(test_vec, X, y).compute()

max_iter = kwargs['max_iter']
assert n_iter > 0 and n_iter <= max_iter
assert opt < test_val


@pytest.mark.parametrize('func,kwargs', [
(admm, {'abstol': 1e-4}),
(proximal_grad, {'tol': 1e-7}),
(admm, {'abstol': 1e-4, 'max_iter': 250}),
(proximal_grad, {'tol': 1e-7, 'max_iter': 100}),
])
@pytest.mark.parametrize('N', [1000])
@pytest.mark.parametrize('nchunks', [1, 10])
Expand All @@ -98,14 +100,16 @@ def test_basic_reg_descent(func, kwargs, N, nchunks, family, lam, reg):

X, y = persist(X, y)

result = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
result, n_iter = func(X, y, family=family, lamduh=lam, regularizer=reg, **kwargs)
test_vec = np.random.normal(size=2)

f = reg.add_reg_f(family.pointwise_loss, lam)

opt = f(result, X, y).compute()
test_val = f(test_vec, X, y).compute()

max_iter = kwargs['max_iter']
assert n_iter > 0 and n_iter <= max_iter
assert opt < test_val


Expand All @@ -124,15 +128,18 @@ def test_determinism(func, kwargs, scheduler):
X, y = make_intercept_data(1000, 10)

with dask.config.set(scheduler=scheduler):
a = func(X, y, **kwargs)
b = func(X, y, **kwargs)
a, n_iter_a = func(X, y, **kwargs)
b, n_iter_b = func(X, y, **kwargs)

max_iter = kwargs['max_iter']
assert n_iter_a > 0 and n_iter_a <= max_iter
assert n_iter_b > 0 and n_iter_b <= max_iter
assert (a == b).all()


try:
from distributed import Client
from distributed.utils_test import cluster, loop # flake8: noqa
from distributed.utils_test import cluster, loop # noqa
except ImportError:
pass
else:
Expand All @@ -147,7 +154,10 @@ def test_determinism_distributed(func, kwargs, loop):
with Client(s['address'], loop=loop) as c:
X, y = make_intercept_data(1000, 10)

a = func(X, y, **kwargs)
b = func(X, y, **kwargs)
a, n_iter_a = func(X, y, **kwargs)
b, n_iter_b = func(X, y, **kwargs)

max_iter = kwargs['max_iter']
assert n_iter_a > 0 and n_iter_a <= max_iter
assert n_iter_b > 0 and n_iter_b <= max_iter
assert (a == b).all()
16 changes: 8 additions & 8 deletions dask_glm/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,41 @@
def test_normalize_normalizes():
@utils.normalize
def do_nothing(X, y):
return np.array([0.0, 1.0, 2.0])
return np.array([0.0, 1.0, 2.0]), None
X = da.from_array(np.array([[1, 0, 0], [1, 2, 2]]), chunks=(2, 3))
y = da.from_array(np.array([0, 1, 0]), chunks=(3, ))
res = do_nothing(X, y)
res, n_iter = do_nothing(X, y)
np.testing.assert_equal(res, np.array([-3.0, 1.0, 2.0]))


def test_normalize_doesnt_normalize():
@utils.normalize
def do_nothing(X, y):
return np.array([0.0, 1.0, 2.0])
return np.array([0.0, 1.0, 2.0]), None
X = da.from_array(np.array([[1, 0, 0], [1, 2, 2]]), chunks=(2, 3))
y = da.from_array(np.array([0, 1, 0]), chunks=(3, ))
res = do_nothing(X, y, normalize=False)
res, n_iter = do_nothing(X, y, normalize=False)
np.testing.assert_equal(res, np.array([0, 1, 2]))


def test_normalize_normalizes_if_intercept_not_present():
@utils.normalize
def do_nothing(X, y):
return np.array([0.0, 1.0, 2.0])
return np.array([0.0, 1.0, 2.0]), None
X = da.from_array(np.array([[1, 0, 0], [3, 9.0, 2]]), chunks=(2, 3))
y = da.from_array(np.array([0, 1, 0]), chunks=(3, ))
res = do_nothing(X, y)
res, n_iter = do_nothing(X, y)
np.testing.assert_equal(res, np.array([0, 1 / 4.5, 2]))


def test_normalize_raises_if_multiple_constants():
@utils.normalize
def do_nothing(X, y):
return np.array([0.0, 1.0, 2.0])
return np.array([0.0, 1.0, 2.0]), None
X = da.from_array(np.array([[1, 2, 3], [1, 2, 3]]), chunks=(2, 3))
y = da.from_array(np.array([0, 1, 0]), chunks=(3, ))
with pytest.raises(ValueError):
res = do_nothing(X, y)
res, n_iter = do_nothing(X, y)


def test_add_intercept():
Expand Down
5 changes: 3 additions & 2 deletions dask_glm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ def normalize_inputs(X, y, *args, **kwargs):
std[intercept_idx] = 1
mean = mean if len(intercept_idx[0]) else np.zeros(mean.shape)
Xn = (X - mean) / std
out = algo(Xn, y, *args, **kwargs).copy()
out, n_iter = algo(Xn, y, *args, **kwargs)
out = out.copy()
i_adj = np.sum(out * mean / std)
out[intercept_idx] -= i_adj
return out / std
return out / std, n_iter
else:
return algo(X, y, *args, **kwargs)
return normalize_inputs
Expand Down