Skip to content

CSpline - Fix segment function and add utils to constrain parameter calculations #3593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 120 additions & 6 deletions pyomo/contrib/cspline_external/cspline_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,32 @@ def get_parameters_from_file(self, fptr):
self.a3 = np.array(self.a3)
self.a4 = np.array(self.a4)

def add_linear_extrapolation_segments(self):
"""Add a segment on the front and back of the cspline so that
any extrapolation will be linear."""
# We need to add a knot for a linear segment on the beginning and
# end. Since the first and last segment will be used for extrapolation,
# and we want them to be linear, it doesn't really matter how far out
# the knots are. To try to be roughly in line with the data scale we
# just use the distance from the first to the last knot.
dist = self.knots[-1] - self.knots[0]
x = np.array([self.knots[0], self.knots[-1]])
y = self.f(x)
m = self.dfdx(x)
b = y - m * x
k = np.array([self.knots[0] - dist, self.knots[-1] + dist])

self.knots = np.insert(self.knots, 0, k[0])
self.a1 = np.insert(self.a1, 0, b[0])
self.a2 = np.insert(self.a2, 0, m[0])
self.a3 = np.insert(self.a3, 0, 0)
self.a4 = np.insert(self.a4, 0, 0)
self.knots = np.append(self.knots, k[1])
self.a1 = np.append(self.a1, b[1])
self.a2 = np.append(self.a2, m[1])
self.a3 = np.append(self.a3, 0)
self.a4 = np.append(self.a4, 0)

def write_parameters(self, fptr):
"""Write parameters to a file"""
assert self.valid
Expand All @@ -180,11 +206,19 @@ def segment(self, x):
array of integers is returned otherwise return an integer
"""
s = np.searchsorted(self.knots, x)
# If x is past the last knot, use the last segment
# this could happen just due to round-off even if
# you don't intend to extrapolate
s[s >= self.n_segments] = self.n_segments - 1
return s
if isinstance(s, np.ndarray):
# if x is before the first knot use the first segment
s[s <= 0] = 1
# if x is after the last knot use last segment to extrapolate
s[s >= len(self.knots)] = len(self.knots) - 1
else:
if s <= 0:
# if x is before the first knot use the first segment
return 0
if s >= len(self.knots):
# if x is after the last knot use last segment to extrapolate
return len(self.knots) - 2
return s - 1

def f(self, x):
"""Get f(x)
Expand All @@ -198,6 +232,18 @@ def f(self, x):
s = self.segment(x)
return self.a1[s] + self.a2[s] * x + self.a3[s] * x**2 + self.a4[s] * x**3

def dfdx(self, x):
"""Get d/dx(f(x))

Args:
x: location, numpy array float

Returns:
df/dx numpy array if x is numpy array or float
"""
s = self.segment(x)
return self.a2[s] + 2 * self.a3[s] * x + 3 * self.a4[s] * x**2


def cubic_parameters_model(
x_data,
Expand Down Expand Up @@ -286,7 +332,7 @@ def yxx_eqn(blk, s):
def ydiff(blk, d):
s = idx[d - 1] + 1
if s >= m.seg_idx.last():
s -= 1
s = m.seg_idx.last()
return m.y_data[d] - _f_cubic(m.x_data[d], m.alpha, s)

if objective_form:
Expand All @@ -313,3 +359,71 @@ def yxx_endpoint_eqn(blk, s):
else:
j = s
return _fxx_cubic(m.x[j], m.alpha, s) == 0


def add_decreasing_constraints(m, tol=0):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions assume that m has a set knt_idx, and that they can create components with specific hardcoded names. This feels like these should be methods on a custom Block class that disallows users to create components so you don't have to worry about conflicts and know that the required attributes are present.

"""If the objective form of the parameter calculation is used, the
data and the spline don't need to match exactly, and we can add
constraints on the derivatives that they are negative at the knots.

This doesn't necessarily mean the cubic spline function is always
decreasing, since the segments are cubic, but you can either check the
resulting curve or pair it with convex or concave constraints.
"""

@m.Constraint(m.knt_idx)
def yx_ineq(blk, k):
if k >= len(m.knt_idx):
s = k - 1
else:
s = k
return _fx_cubic(m.x[k], m.alpha, s) <= -tol
Comment on lines +375 to +380
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good for these functions to be declared at the module scope so that they would be picklable. Given the use of the tol argument, maybe they should be functors:

class _yx_ineq(object):
    def __init__(self, tol):
        self.tol = tol

    def __call__(self, m, k):
        if k >= len(m.knt_idx):
            s = k - 1
        else:
            s = k
        return _fx_cubic(m.x[k], m.alpha, s) <= -self.tol

def add_decreasing_constraints(m, tol=0):
    m.yx_ineq = Constraint(m.knt_idx, rule=_yx_ineq(tol))



def add_concave_constraints(m, tol=0):
"""If the objective form of the parameter calculation is used, the
data and the spline don't need to match exactly, and we can add
constraints on the second derivatives that they are always negative.
"""

@m.Constraint(m.knt_idx)
def yxx_ineq(blk, k):
if k >= len(m.knt_idx):
s = k - 1
else:
s = k
return _fxx_cubic(m.x[k], m.alpha, s) <= -tol


def add_increasing_constraints(m, tol=0):
"""If the objective form of the parameter calculation is used, the
data and the spline don't need to match exactly, and we can add
constraints on the derivatives that they are positive at the knots.

This doesn't necessarily mean the cubic spline function is always
increasing, since the segments are cubic, but you can either check the
resulting curve or pair it with convex or concave constraints.
"""

@m.Constraint(m.knt_idx)
def yx_ineq(blk, k):
if k >= len(m.knt_idx):
s = k - 1
else:
s = k
return _fx_cubic(m.x[k], m.alpha, s) >= tol


def add_convex_constraints(m, tol=0):
"""If the objective form of the parameter calculation is used, the
data and the spline don't need to match exactly, and we can add
constraints on the second derivatives that they are always positive.
"""

@m.Constraint(m.knt_idx)
def yxx_ineq(blk, k):
if k >= len(m.knt_idx):
s = k - 1
else:
s = k
return _fxx_cubic(m.x[k], m.alpha, s) >= tol
127 changes: 127 additions & 0 deletions pyomo/contrib/cspline_external/test/test_parameter_calculation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from pyomo.contrib.cspline_external.cspline_parameters import (
cubic_parameters_model,
CsplineParameters,
add_increasing_constraints,
add_decreasing_constraints,
add_convex_constraints,
add_concave_constraints,
)
from pyomo.opt import check_available_solvers
from pyomo.common.dependencies import numpy as np, numpy_available
Expand Down Expand Up @@ -68,3 +72,126 @@ def test_param_gen(self):
y_pred = params.f(np.array(x_data))
for yd, yp in zip(y_data, y_pred):
self.assertAlmostEqual(yd, yp, 8)

# Make sure the predictions are correct
y_pred = params.f(np.array(x_data) + 1e-4)
for yd, yp in zip(y_data, y_pred):
self.assertAlmostEqual(yd, yp, 3)

# Make sure the predictions are correct
y_pred = params.f(np.array(x_data) - 1e-4)
for yd, yp in zip(y_data, y_pred):
self.assertAlmostEqual(yd, yp, 3)

self.assertAlmostEqual(1, params.f(5), 8)
self.assertAlmostEqual(1, params.f(5 - 1e-4), 3)
self.assertAlmostEqual(1, params.f(5 + 1e-4), 3)
self.assertAlmostEqual(2, params.f(1), 8)
self.assertAlmostEqual(2, params.f(1 - 1e-4), 3)
self.assertAlmostEqual(2, params.f(1 + 1e-4), 3)

def test_param_increasing(self):
x_data = [1, 2, 3, 4, 5]
y_data = [1, 4, 9, 16, 25]

m = cubic_parameters_model(x_data, y_data, objective_form=True)
add_increasing_constraints(m)
solver_obj = pyo.SolverFactory("ipopt")
solver_obj.solve(m)

params = CsplineParameters(model=m)

# Make sure the predictions are correct
y_pred = params.f(np.array(x_data))
for yd, yp in zip(y_data, y_pred):
self.assertAlmostEqual(yd, yp, 6)

x_new = [1.5, 2.5, 3.5, 4.5]
y_new = [2.25, 6.25, 12.25, 20.25]
# Check midpoints. These aren't necessarily super close due
# to the end point second derivative constraints, and I
# calculated test values from a quadratic.
y_pred = params.f(np.array(x_new))
for yd, yp in zip(y_new, y_pred):
self.assertAlmostEqual(yd, yp, 0)

def test_param_increasing_linear_extrap(self):
x_data = [1, 2, 3, 4, 5]
y_data = [1, 4, 9, 16, 25]

m = cubic_parameters_model(x_data, y_data, objective_form=True)
add_increasing_constraints(m)

solver_obj = pyo.SolverFactory("ipopt")
solver_obj.solve(m)

params = CsplineParameters(model=m)
params.add_linear_extrapolation_segments()

# Make sure the predictions are correct
y_pred = params.f(np.array(x_data))
for yd, yp in zip(y_data, y_pred):
self.assertAlmostEqual(yd, yp, 6)

x_new = [0, 6, 7, 8]
y_new = [-1.571, 34.42857, 43.8571, 53.2857]
y_pred = params.f(np.array(x_new))
for yd, yp in zip(y_new, y_pred):
self.assertAlmostEqual(yd, yp, 3)

def test_param_decreasing(self):
x_data = [-1, -2, -3, -4, -5]
y_data = [1, 4, 9, 16, 25]

m = cubic_parameters_model(x_data, y_data, objective_form=True)
add_decreasing_constraints(m)

solver_obj = pyo.SolverFactory("ipopt")
solver_obj.solve(m)

params = CsplineParameters(model=m)

# Make sure the predictions are correct
y_pred = params.f(np.array(x_data))
for yd, yp in zip(y_data, y_pred):
self.assertAlmostEqual(yd, yp, 5)

x_new = [-1.5, -2.5, -3.5, -4.5]
y_new = [2.25, 6.25, 12.25, 20.25]
# Check midpoints. These aren't necessarily super close due
# to the end point second derivative constraints, and I
# calculated test values from a quadratic.
y_pred = params.f(np.array(x_new))
for yd, yp in zip(y_new, y_pred):
self.assertAlmostEqual(yd, yp, 1)

def test_convex(self):
x_data = [-1, 1, 2, 3, 4, 5, 6]
y_data = [1, 1, 4, 11, 16, 25, 36]

m = cubic_parameters_model(
x_data, y_data, objective_form=True, end_point_constraint=False
)
add_convex_constraints(m, tol=0.1)
solver_obj = pyo.SolverFactory("ipopt")
solver_obj.solve(m)
params = CsplineParameters(model=m)
y_pred = params.f(np.array(x_data))
y_new = [1.007, 0.891, 4.524, 10.154, 16.536, 24.866, 36.0223]
for yd, yp in zip(y_new, y_pred):
self.assertAlmostEqual(yd, yp, 2)

def test_concave(self):
x_data = [1, 1, 2, 3, 4, 5, 6]
y_data = [-1, -1, -4, -14, -16, -25, -36]
m = cubic_parameters_model(
x_data, y_data, objective_form=True, end_point_constraint=False
)
add_concave_constraints(m, tol=0.1)
solver_obj = pyo.SolverFactory("ipopt")
solver_obj.solve(m)
params = CsplineParameters(model=m)
y_pred = params.f(np.array(x_data))
y_new = [-1.000, -1.000, -5.259, -11.3533, -17.5475, -24.80776, -36.032]
for yd, yp in zip(y_new, y_pred):
self.assertAlmostEqual(yd, yp, 2)
Loading