Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions roottest/python/distrdf/backends/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ if(test_distrdf_pyspark)
# Define environment variables needed in all pyspark tests
set(PYSPARK_ENV_VARS PYSPARK_PYTHON=${Python3_EXECUTABLE})

if(MACOSX_VERSION VERSION_GREATER_EQUAL 10.13)
if(CMAKE_SYSTEM_NAME STREQUAL Darwin AND CMAKE_SYSTEM_VERSION VERSION_GREATER_EQUAL 10.13)
# MacOS has changed rules about forking processes after 10.13
# Running pyspark tests with XCode Python3 throws crashes with errors like:
# `objc[17271]: +[__NSCFConstantString initialize] may have been in progress in another thread when fork() was called.`
Expand Down Expand Up @@ -41,11 +41,7 @@ list(APPEND DISTRDF_ENVIRONMENT_VARS DISTRDF_BACKENDS_IN_USE=${DISTRDF_BACKENDS_
# setting the property to 4. The test also locks a resource for the creation of
# the clusters, depending on how many backends are active. The resource lock
# is shared with the "common" folder and the tutorials of the main repository.
ROOTTEST_ADD_TEST(test_all
MACRO test_all.py
ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}"
TIMEOUT 1200
PROPERTIES PROCESSORS 4)
ROOT_ADD_PYUNITTEST(test_all test_all.py GENERIC ENVIRONMENT "${DISTRDF_ENVIRONMENT_VARS}")

# This test has to take multiple resource locks. This means that they should
# be passed as a cmake list (semi-colon separated strings) after the
Expand All @@ -55,4 +51,6 @@ ROOTTEST_ADD_TEST(test_all
# The safest and most reliable thing to do is to call the final cmake
# function directly here, so we can be sure that the PROPERTIES argument
# will be properly parsed.
set_tests_properties(roottest-python-distrdf-backends-test_all PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}")
set_tests_properties(
pyunittests-roottest-python-distrdf-backends-all
PROPERTIES RESOURCE_LOCK "${DISTRDF_RESOURCE_LOCKS}" PROCESSORS 4 TIMEOUT 1200)
34 changes: 21 additions & 13 deletions roottest/python/distrdf/backends/check_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def test_set_spark_context_with_conf(self, payload):
if backend == "spark":
import pyspark
from ROOT._distrdf.Backends.Spark import Backend

backend = Backend.SparkBackend(sparkcontext=connection)

assert isinstance(backend.sc, pyspark.SparkContext)
Expand All @@ -52,10 +53,12 @@ def test_optimize_npartitions(self, payload):
connection, backend = payload
if backend == "dask":
from ROOT._distrdf.Backends.Dask import Backend

backend = Backend.DaskBackend(daskclient=connection)
assert backend.optimize_npartitions() == 2
elif backend == "spark":
from ROOT._distrdf.Backends.Spark import Backend

backend = Backend.SparkBackend(sparkcontext=connection)
assert backend.optimize_npartitions() == 2

Expand Down Expand Up @@ -92,6 +95,7 @@ def test_initialization_method(self, payload):

def init(value):
import ROOT

cpp_code = f"int userValue = {value};"
ROOT.gInterpreter.ProcessLine(cpp_code)

Expand All @@ -111,13 +115,15 @@ def init(value):
# Finally, Histo1D returns a histogram filled with one value. The mean
# of this single value has to be the value itself, independently of
# the number of spawned workers.
df = ROOT.RDataFrame(1, executor=connection)
with pytest.warns(
UserWarning, match="Number of partitions 2 is greater than number of entries 1 in the dataframe"
):
df = ROOT.RDataFrame(1, executor=connection)

df = df.Define("u", "userValue").Histo1D(("name", "title", 1, 100, 130), "u")

df = df.Define("u", "userValue").Histo1D(
("name", "title", 1, 100, 130), "u")

h = df.GetValue()
assert h.GetMean() == 123
h = df.GetValue()
assert h.GetMean() == 123


class TestEmptyTreeError:
Expand Down Expand Up @@ -152,8 +158,7 @@ def test_count_with_some_empty_trees(self, payload):

connection, _ = payload
treenames = [f"tree_{i}" for i in range(3)]
filenames = [
f"../data/ttree/distrdf_roottest_check_backend_{i}.root" for i in range(3)]
filenames = [f"../data/ttree/distrdf_roottest_check_backend_{i}.root" for i in range(3)]

empty_treename = "empty"
empty_filename = "../data/ttree/empty.root"
Expand Down Expand Up @@ -218,8 +223,9 @@ def test_user_supplied_npartitions_have_precedence(self, payload):
class TestPropagateExceptions:
"""Tests that the C++ exceptions are properly propagated."""

@pytest.mark.skipif(platform.system() == "Darwin" and platform.machine() == "arm64",
reason="cannot catch exceptions on macOS arm64")
@pytest.mark.skipif(
platform.system() == "Darwin" and platform.machine() == "arm64", reason="cannot catch exceptions on macOS arm64"
)
def test_runtime_error_is_propagated(self, payload):
"""The test creates a TGraph with mixed scalar and vector columns."""
connection, backend = payload
Expand All @@ -231,14 +237,16 @@ def test_runtime_error_is_propagated(self, payload):
# DistRDF. Need to make this distinction so that the pytest.raises
# call does not get confused by the extra level of indirection
from py4j import protocol

raised_exc = protocol.Py4JJavaError
else:
raised_exc = RuntimeError # Dask always raises a Python RuntimeError
raised_exc = RuntimeError # Dask always raises a Python RuntimeError

df = df.Define("x", "1").Define("y", "ROOT::RVecF{1., 2., 3.}")
g = df.Graph("x", "y")
cpp_error_what = ("runtime_error: Graph was applied to a mix of scalar "
"values and collections. This is not supported.")
cpp_error_what = (
"runtime_error: Graph was applied to a mix of scalar values and collections. This is not supported."
)
with pytest.raises(raised_exc, match=cpp_error_what):
g.GetValue()

Expand Down
37 changes: 29 additions & 8 deletions roottest/python/distrdf/backends/check_explicit_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import textwrap
import warnings

import pytest
import ROOT
Expand All @@ -16,10 +15,10 @@ def test_histo_variations(self, payload):
connection, backend = payload
if backend == "dask":
RDataFrame = ROOT.RDF.Distributed.Dask.RDataFrame
df = RDataFrame(10, npartitions=2, daskclient=connection)
df = RDataFrame(10, npartitions=2, executor=connection)
elif backend == "spark":
RDataFrame = ROOT.RDF.Distributed.Spark.RDataFrame
df = RDataFrame(10, npartitions=2, sparkcontext=connection)
df = RDataFrame(10, npartitions=2, executor=connection)
df = df.Define("x", "1")
df1 = df.Vary("x", "ROOT::RVecI{-2,2}", ["down", "up"])
h = df1.Histo1D(("name", "title", 10, -10, 10), "x")
Expand Down Expand Up @@ -72,10 +71,10 @@ def test_redefine_one_column(self, payload):
connection, backend = payload
if backend == "dask":
RDataFrame = ROOT.RDF.Distributed.Dask.RDataFrame
df = RDataFrame(10, daskclient=connection)
df = RDataFrame(10, executor=connection)
elif backend == "spark":
RDataFrame = ROOT.RDF.Distributed.Spark.RDataFrame
df = RDataFrame(10, sparkcontext=connection)
df = RDataFrame(10, executor=connection)
df_before = df.Define("x", "1")
df_after = df_before.Redefine("x", "2")

Expand All @@ -89,11 +88,11 @@ def test_redefine_one_column(self, payload):


class TestDeprecation:
"""Test the deprecation message regarding the 'Experimental' module"""
"""Test deprecation warnings in the distributed module."""

def test_warning_message(self, payload):
def test_experimental_module_deprecation(self, payload):
connection, backend = payload
with warnings.catch_warnings(record=True) as warninglist:
with pytest.warns() as warninglist:
if backend == "dask":
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
df = RDataFrame(10, npartitions=2, executor=connection)
Expand Down Expand Up @@ -124,6 +123,28 @@ def test_warning_message(self, payload):

assert df.Count().GetValue() == 10

def test_backend_argument_deprecation(self, payload):
connection, backend = payload

if backend == "dask":
with pytest.warns(): # This will trigger the warning about experimental module deprecation tested above
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
with pytest.warns(
FutureWarning,
match="The keyword argument 'daskclient' is not necessary anymore and will be removed in a future release",
):
df = RDataFrame(10, npartitions=2, daskclient=connection)
assert df.Count().GetValue() == 10
elif backend == "spark":
with pytest.warns(): # This will trigger the warning about experimental module deprecation tested above
RDataFrame = ROOT.RDF.Experimental.Distributed.Spark.RDataFrame
with pytest.warns(
FutureWarning,
match="The keyword argument 'sparkcontext' is not necessary anymore and will be removed in a future release",
):
df = RDataFrame(10, npartitions=2, sparkcontext=connection)
assert df.Count().GetValue() == 10


if __name__ == "__main__":
pytest.main(args=[__file__])
72 changes: 41 additions & 31 deletions roottest/python/distrdf/backends/check_fromspec.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,99 @@
import pytest
import ROOT
import ROOT._distrdf


class TestFromSpec:

# RDataFrame is reading files containing CMSSW classes in this test, which will trigger a warning from TClass about
# missing class dictionaries. The warning is only triggered once for the entire duration of the process. Within the
# same process, we may be running the same test multiple times (once per parametrized backend in use). So we can't
# use programmatic warning catching e.g. with pytest.warns because it would always fail on consecutive runs of this
# test. We filter out that warning specifically instead
@pytest.mark.filterwarnings("ignore:no dictionary for class")
def test_fromspec_different_trees(self, payload):
"""
Test usage of FromSpec function when each sample has different trees
"""

connection, _ = payload

jsonfile = "../data/ttree/spec_differenttrees.json"
df = ROOT.RDF.Experimental.FromSpec(jsonfile, executor = connection)


df = ROOT.RDF.Experimental.FromSpec(jsonfile, executor=connection)
df_checkfilt = df.FilterAvailable("nElectron").Filter("nElectron > 2")

df_new = df.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")')
df_filtered = df_new.Filter("lum == 100.")
df_filtered_two = df_new.Filter("lum == 200.")

df_local = ROOT.RDF.Experimental.FromSpec(jsonfile)
df_new_local = df_local.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")')
df_filtered_local = df_new_local.Filter("lum == 100.")
df_filtered_two_local = df_new_local.Filter("lum == 200.")

assert df_checkfilt.Count().GetValue() == 1683
assert df_filtered.Count().GetValue() == 11000
assert df_filtered.Count().GetValue() == df_filtered_local.Count().GetValue()

assert df_filtered_two.Count().GetValue() == 13020
assert df_filtered_two.Count().GetValue() == df_filtered_two_local.Count().GetValue()
df_checkfilt_count = df_checkfilt.Count()
df_filtered_count = df_filtered.Count()
df_filtered_local_count = df_filtered_local.Count()
df_filtered_two_count = df_filtered_two.Count()
df_filtered_two_local_count = df_filtered_two_local.Count()

assert df_checkfilt_count.GetValue() == 1683
assert df_filtered_count.GetValue() == 11000
assert df_filtered_count.GetValue() == df_filtered_local_count.GetValue()

assert df_filtered_two_count.GetValue() == 13020
assert df_filtered_two_local_count.GetValue() == df_filtered_two_local_count.GetValue()

def test_fromspec_files_multiple_trees(self, payload):
"""
Test usage of FromSpec function when some samples have multiple trees
"""
connection, _ = payload

jsonfile_two = "../data/ttree/spec.json"
rdf = ROOT.RDF.Experimental.FromSpec(jsonfile_two, executor = connection)

rdf = ROOT.RDF.Experimental.FromSpec(jsonfile_two, executor=connection)

rdf_filt = rdf.FilterAvailable("b1").Filter("b1 > 42")

nentries = 3000

assert rdf_filt.Count().GetValue() == nentries
assert rdf_filt.Mean("b1").GetValue() == 50

rdf_new = rdf.DefinePerSample("lum", 'rdfsampleinfo_.GetD("lum")')

rdf_filtered = rdf_new.Filter("lum == 200.")
rdf_filtered_two = rdf_new.Filter("lum == 100.")
rdf_filtered_three = rdf_new.Filter("lum == 5.")
rdf_filtered_three = rdf_new.Filter("lum == 5.")
rdf_filtered_four = rdf_new.Filter("lum == 10.")

assert rdf_filtered.Count().GetValue() == 30000
assert rdf_filtered_two.Count().GetValue() == 3000
assert rdf_filtered_two.Count().GetValue() == 3000
assert rdf_filtered_three.Count().GetValue() == 2
assert rdf_filtered_four.Count().GetValue() == 3

def test_fromspec_with_friends(self, payload):
"""
Test usage of FromSpec function when friends trees are added
Test usage of FromSpec function when friends trees are added
"""

connection, _ = payload

jsonfile_three = "../data/ttree/spec_withfriends.json"
rdf_friends = ROOT.RDF.Experimental.FromSpec(jsonfile_three, executor = connection)
rdf_friends = ROOT.RDF.Experimental.FromSpec(jsonfile_three, executor=connection)

rdf_friends_new = rdf_friends.DefinePerSample("lumi", 'rdfsampleinfo_.GetD("lumi")')

rdf_friends_filtered = rdf_friends_new.Filter("lumi == 1.")
rdf_friends_filtered_two = rdf_friends_new.Filter("lumi == 0.5")

rdf_friends_values = rdf_friends_filtered_two.Filter("friendTree.z > 103")
rdf_friends_values_two = rdf_friends.Filter("friendChain1.z > 102")

assert rdf_friends_filtered.Count().GetValue() == 4
assert rdf_friends_filtered_two.Count().GetValue() == 1
assert rdf_friends_values.Count().GetValue() == 1
assert rdf_friends_values_two.Count().GetValue() == 2


if __name__ == "__main__":
pytest.main(args=[__file__])
4 changes: 2 additions & 2 deletions roottest/python/distrdf/backends/check_reducer_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,10 @@ def test_alias(self, payload):
connection, backend = payload
if backend == "dask":
RDataFrame = ROOT.RDF.Distributed.Dask.RDataFrame
df = RDataFrame(10, daskclient=connection)
df = RDataFrame(10, executor=connection)
elif backend == "spark":
RDataFrame = ROOT.RDF.Distributed.Spark.RDataFrame
df = RDataFrame(10, sparkcontext=connection)
df = RDataFrame(10, executor=connection)
df = df.Define("x", "1")
df_alias = df.Alias("myalias", "x")

Expand Down
18 changes: 9 additions & 9 deletions roottest/python/distrdf/backends/check_variations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ def test_mixed(self, payload):
connection, _ = payload
df = ROOT.RDataFrame(10, executor=connection, npartitions=2)
df = df.Define("x", "1").Define("y", "42")
h = df.Vary("x", "ROOT::RVecI{-1, 2}",
variationTags=["down", "up"]).Histo1D(("name", "title", 10, -500, 500), "x", "y")
h = df.Vary("x", "ROOT::RVecI{-1, 2}", variationTags=["down", "up"]).Histo1D(
("name", "title", 10, -500, 500), "x", "y"
)
histos = ROOT.RDF.Experimental.VariationsFor(h)

expectednames = ["nominal", "x:down", "x:up"]
Expand All @@ -59,9 +60,9 @@ def test_simultaneous(self, payload):
connection, _ = payload
df = ROOT.RDataFrame(10, executor=connection, npartitions=2)
df = df.Define("x", "1").Define("y", "42")
h = df.Vary(["x", "y"],
"ROOT::RVec<ROOT::RVecI>{{-1, 2, 3}, {41, 43, 44}}",
["down", "up", "other"], "xy").Histo1D(("name", "title", 10, -500, 500), "x", "y")
h = df.Vary(
["x", "y"], "ROOT::RVec<ROOT::RVecI>{{-1, 2, 3}, {41, 43, 44}}", ["down", "up", "other"], "xy"
).Histo1D(("name", "title", 10, -500, 500), "x", "y")
histos = ROOT.RDF.Experimental.VariationsFor(h)

expectednames = ["nominal", "xy:down", "xy:up", "xy:other"]
Expand All @@ -77,8 +78,7 @@ def test_varyfiltersum(self, payload):
connection, _ = payload
df = ROOT.RDataFrame(10, executor=connection, npartitions=2)
df = df.Define("x", "1")
df_sum = df.Vary(
"x", "ROOT::RVecI{-1*x, 2*x}", ("down", "up"), "myvariation").Filter("x > 0").Sum("x")
df_sum = df.Vary("x", "ROOT::RVecI{-1*x, 2*x}", ("down", "up"), "myvariation").Filter("x > 0").Sum("x")

assert df_sum.GetValue() == 10

Expand All @@ -93,10 +93,10 @@ def test_variationsfor_novary(self, payload):
connection, backend = payload
if backend == "dask":
RDataFrame = ROOT.RDF.Distributed.Dask.RDataFrame
df = RDataFrame(1, npartitions=1, daskclient=connection)
df = RDataFrame(1, npartitions=1, executor=connection)
elif backend == "spark":
RDataFrame = ROOT.RDF.Distributed.Spark.RDataFrame
df = RDataFrame(1, npartitions=1, sparkcontext=connection)
df = RDataFrame(1, npartitions=1, executor=connection)

df = df.Define("x", "1")
h = df.Histo1D(("h", "h", 1, 0, 10), "x")
Expand Down
Loading
Loading