Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion databricks/koalas/missing/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class MissingPandasLikeSeries(object):
autocorr = _unsupported_function("autocorr")
between_time = _unsupported_function("between_time")
combine = _unsupported_function("combine")
cov = _unsupported_function("cov")
droplevel = _unsupported_function("droplevel")
ewm = _unsupported_function("ewm")
factorize = _unsupported_function("factorize")
Expand Down
45 changes: 45 additions & 0 deletions databricks/koalas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4858,6 +4858,51 @@ def mad(self):

return mad

def cov(self, other: "Series", min_periods: Optional[int] = None) -> float:
"""
Compute covariance with Series, excluding missing values.

Parameters
----------
other : Series
Series with which to compute the covariance.
min_periods : int, optional
Minimum number of observations needed to have a valid result.

Returns
-------
float
Covariance between Series and other normalized by N-1
(unbiased estimator).

Examples
--------
>>> import databricks.koalas as ks
>>> ks.set_option("compute.ops_on_diff_frames", True)
>>> s1 = ks.Series([0.90010907, 0.13484424, 0.62036035])
>>> s2 = ks.Series([0.12528585, 0.26962463, 0.51111198])
>>> s1.cov(s2)
-0.01685762652715874
>>> ks.reset_option("compute.ops_on_diff_frames")
"""

if not isinstance(other, Series):
raise ValueError("'other' must be a Series")

if len(self.index) != len(other.index):
raise ValueError("series are not aligned")
Comment on lines +4892 to +4893
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this from? Seems like pandas works even with a different length of Series.

>>> pd.Series([1, 2, 3, 4]).cov(pd.Series([5, 6]))
0.5

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I missed it. Thanks, @ueshin .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm this is interesting. Seems like pandas performs an alignment between the series before computing the covariance. So, this:

>>> pd.Series([1, 2, 3, 4]).cov(pd.Series([5, 6]))
0.5

And this:

>>> pd.Series([1, 2]).cov(pd.Series([5, 6]))
0.5

are equivalent... I believe this align is not supported in Koalas today. If this is a blocker I could open an issue and wait until somebody implements this. Another option I can think of is to go ahead and have a slightly different behavior for this edge case while we wait for the align implementation. Do you have any thoughts/preference on how to go about this @itholic @ueshin ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lopez- Could you file the issue for align?
Also, is it possible for you to implement it?


min_periods = 0 if min_periods is None else min_periods
if len(self.index) < min_periods or len(self.index) <= 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also compare len(self.index) with min_periods?

>>> pd.Series([1, 2]).cov(pd.Series([5, 6, 7, 8]), min_periods=3)
nan

return np.nan

if same_anchor(self, other):
# if the have the same anchor use the more performant Spark native `cov`
return self._internal.spark_frame.cov(self.name, other.name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._kdf._internal.resolved_copy.spark_frame.cov(
    self._internal.data_spark_column_names[0],
    other._internal.data_spark_column_names[0])

?

FYI: self.name won't always be the same as the underlying Spark DataFrame column name. See the description of #1554.

else:
# if not on the same anchor calculate covariance manually
return (self - self.mean()).dot(other - other.mean()) / (len(self.index) - 1)
Copy link
Contributor

@itholic itholic Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(self.index) is performed four times in this code.

What do you think about we assign a proper variable and reuse it?
(ex. len_index = len(self.index) at the line above this variable is first used)

Comment on lines +4903 to +4904
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should create a new DataFrame and use it, something like:

kdf = self._kdf.copy()
tmp_column = verify_temp_column_name(kdf, '__tmp_column__')
kdf[tmp_column] = other
return kdf._kser_for(self._column_label).cov(kdf._kser_for(tmp_column), min_period=min_period)

I haven't checked the code, so please modify as it works.

Btw, we should do this at the beginning of this method to avoid extra checks for length or something.


def unstack(self, level=-1):
"""
Unstack, a.k.a. pivot, Series with MultiIndex to produce DataFrame.
Expand Down
26 changes: 26 additions & 0 deletions databricks/koalas/tests/test_ops_on_diff_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,32 @@ def test_series_repeat(self):
else:
self.assert_eq(kser1.repeat(kser2).sort_index(), pser1.repeat(pser2).sort_index())

def test_cov(self):
pser = pd.Series([90, 91, 85])
kser = ks.from_pandas()
kser_other = ks.Series([90, 91, 85])
pser_other = kser_other.to_pandas()

self.assert_eq(kser.cov(kser_other), pser.cov(pser_other), almost=True)

kser = ks.Series([90])
pser = kser.to_pandas()
kser_other = ks.Series([85])
pser_other = kser_other.to_pandas()

k_isnan = np.isnan(kser.cov(kser_other))
p_isnan = np.isnan(pser.cov(pser_other))
self.assert_eq(k_isnan, p_isnan)

kser = ks.Series([90, 91, 85])
pser = kser.to_pandas()
kser_other = ks.Series([90, 91, 85])
pser_other = kser_other.to_pandas()

k_isnan = np.isnan(kser.cov(kser_other, 4))
p_isnan = np.isnan(pser.cov(pser_other, 4))
self.assert_eq(k_isnan, p_isnan)

Copy link
Contributor

@itholic itholic Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test when each Series has a different index and an Exception case?

For example,

        kser = ks.Series([90, 91, 85], index=[1, 2, 3])
        pser = kser.to_pandas()
        kser_other = ks.Series([90, 91, 85], index=[-1, -2, -3])
        pser_other = kser_other.to_pandas()

        self.assert_eq(kser.cov(kser_other), pser.cov(pser_other), almost=True)

and

    self.assertRaises(ValueError, lambda: kser.cov([90, 91, 85]))  # 'other' must be a Series
    self.assertRaises(ValueError, lambda: kser.cov(ks.Series([90])))  # series are not aligned


class OpsOnDiffFramesDisabledTest(ReusedSQLTestCase, SQLTestUtils):
@classmethod
Expand Down
22 changes: 22 additions & 0 deletions databricks/koalas/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,3 +1787,25 @@ def test_ffill(self):
kser.ffill(inplace=True)
pser.ffill(inplace=True)
self.assert_eq(repr(kser), repr(pser))

def test_cov(self):
kdf = ks.DataFrame({"A": [90, 91, 85], "B": [90, 91, 85]}, columns=["A", "B"])
pdf = kdf.to_pandas()

self.assert_eq(kdf.A.cov(kdf.B), pdf.A.cov(pdf.B), almost=True)

kdf = ks.DataFrame({"A": [90], "B": [90]}, columns=["A", "B"])
pdf = kdf.to_pandas()

k_cov = kdf.A.cov(kdf.B)
p_cov = pdf.A.cov(pdf.B)

self.assert_eq(np.isnan(k_cov), np.isnan(p_cov))

kdf = ks.DataFrame({"A": [90, 91, 85], "B": [90, 91, 85]}, columns=["A", "B"])
pdf = kdf.to_pandas()

k_cov = kdf.A.cov(kdf.B, 4)
p_cov = pdf.A.cov(pdf.B, 4)

self.assert_eq(np.isnan(k_cov), np.isnan(p_cov))