diff --git a/app.py b/app.py index 581169a..3d97855 100644 --- a/app.py +++ b/app.py @@ -34,6 +34,7 @@ from apps.routes.v1.anomaly.api import ns as ns_anomaly from apps.routes.v1.ssoft.api import ns as ns_ssoft from apps.routes.v1.metadata.api import ns as ns_metadata +from apps.routes.v1.recommender.api import ns as ns_recommender config = extract_configuration("config.yml") @@ -79,6 +80,7 @@ def after_request(response): api.add_namespace(ns_anomaly) api.add_namespace(ns_ssoft) api.add_namespace(ns_metadata) +api.add_namespace(ns_recommender) # Register blueprint app.register_blueprint(blueprint) diff --git a/apps/routes/v1/recommender/README.md b/apps/routes/v1/recommender/README.md new file mode 100644 index 0000000..f835daa --- /dev/null +++ b/apps/routes/v1/recommender/README.md @@ -0,0 +1,129 @@ +# Recommender system + +Here are the first applications for the graph component in Fink. + +## Tag-based analysis + +Idea: users tag objects. From these tags, they can search for closest objects in the graph. We start from a few tags (the seed), and as we acquire more tags, the search should become more personnalised (from seed to tree). + +Question: +- how many tags are required to get relevant results? +- how many tags should be available? +- how to handle different tags for the same object? +- can we also automatise some tagging mechanism (i.e. from manual tag assignment to automatic science module)? + +## Classification transfer + +Goal: compare two classification schemes (how C0 and C1 relates to each other). + +## Todo + +todo: +- [ ] `recommender` route in the API to get similar objects +- [ ] New HBase table to store obj/tags +- [ ] Add tag button in web portal +- [ ] Add tag as classification mechanism based on tags in graph tools +- [ ] Contact pilot teams to test capabilities + +## Profiling + +```python +Total time: 0.26651 s +File: /opt/fink-object-api/apps/utils/client.py +Function: connect_to_graph at line 60 + +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 60 @profile + 61 def connect_to_graph(): + 62 """Return a client connected to a graph""" + 63 1 5578.6 5578.6 2.1 config = extract_configuration("config.yml") + 64 1 240.1 240.1 0.1 gateway = JavaGateway(auto_convert=True) + 65 + 66 1 256935.3 256935.3 96.4 jc = gateway.jvm.com.Lomikel.Januser.JanusClient(config["PRO… + 67 + 68 # TODO: add definition of IP/PORT/TABLE/SCHEMA here in new v… + 69 1 2468.6 2468.6 0.9 gr = gateway.jvm.com.astrolabsoftware.FinkBrowser.Januser.Fi… + 70 + 71 1 1287.6 1287.6 0.5 return gr, gateway.jvm.com.astrolabsoftware.FinkBrowser.Janu… + + +Total time: 1.60666 s +File: /opt/fink-object-api/apps/routes/v1/recommender/utils.py +Function: extract_similar_objects at line 24 + +Line # Hits Time Per Hit % Time Line Contents +============================================================== + 24 @profile + 25 def extract_similar_objects(payload: dict) -> pd.DataFrame: + 26 """Extract similar objects returned by JanusGraph and format… + 27 + 28 Data is from /api/v1/recommender + 29 + 30 Parameters + 31 ---------- + 32 payload: dict + 33 See https://api.fink-portal.org + 34 + 35 Return + 36 ---------- + 37 out: pandas dataframe + 38 """ + 39 1 1.2 1.2 0.0 if "n" not in payload: + 40 1 0.5 0.5 0.0 nobjects = 10 + 41 else: + 42 nobjects = int(payload["n"]) + 43 + 44 1 0.2 0.2 0.0 if "classifier" not in payload: + 45 1 0.2 0.2 0.0 classifier_name = "FINK_PORTAL" + 46 else: + 47 classifier_name = payload["classifier"] + 48 + 49 1 7097.0 7097.0 0.4 user_config = extract_configuration("config.yml") + 50 + 51 1 266750.0 266750.0 16.6 gr, classifiers = connect_to_graph() + 52 + 53 # Classify source + 54 1 377.3 377.3 0.0 func = getattr(classifiers, classifier_name) + 55 2 1110560.4 555280.2 69.1 gr.classifySource( + 56 1 0.7 0.7 0.0 func, + 57 1 1.3 1.3 0.0 payload["objectId"], + 58 2 26.3 13.2 0.0 '{}:{}:{}'.format( + 59 1 1.5 1.5 0.0 user_config["HBASEIP"], + 60 1 1.7 1.7 0.0 user_config["ZOOPORT"], + 61 1 1.0 1.0 0.0 user_config["SCHEMAVER"] + 62 ), + 63 1 1.0 1.0 0.0 False, + 64 1 0.6 0.6 0.0 None + 65 ) + 66 + 67 2 206912.5 103456.3 12.9 closest_sources = gr.sourceNeighborhood( + 68 1 2.3 2.3 0.0 payload["objectId"], + 69 1 0.7 0.7 0.0 classifier_name + 70 ) + 71 1 3.0 3.0 0.0 out = {"i:objectId": [], "v:distance": [], "v:classification… + 72 1 871.1 871.1 0.1 for index, (oid, distance) in enumerate(closest_sources.item… + 73 if index > nobjects: + 74 break + 75 + 76 r = requests.post( + 77 "https://api.fink-portal.org/api/v1/objects", + 78 json={ + 79 "objectId": oid, + 80 "output-format": "json" + 81 } + 82 ) + 83 out["v:classification"].append(r.json()[0]["v:classifica… + 84 out["i:objectId"].append(oid) + 85 out["v:distance"].append(distance) + 86 + 87 1 1211.7 1211.7 0.1 pdf = pd.DataFrame(out) + 88 + 89 1 12835.5 12835.5 0.8 gr.close() + 90 + 91 1 3.8 3.8 0.0 return pdf + + + 0.27 seconds - /opt/fink-object-api/apps/utils/client.py:60 - connect_to_graph + 1.61 seconds - /opt/fink-object-api/apps/routes/v1/recommender/utils.py:24 - extract_similar_objects +``` diff --git a/apps/routes/v1/recommender/__init__.py b/apps/routes/v1/recommender/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/routes/v1/recommender/api.py b/apps/routes/v1/recommender/api.py new file mode 100644 index 0000000..b1c06c0 --- /dev/null +++ b/apps/routes/v1/recommender/api.py @@ -0,0 +1,85 @@ +# Copyright 2025 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from flask import Response, request +from flask_restx import Namespace, Resource, fields + +from apps.utils.utils import check_args +from apps.utils.utils import send_tabular_data + +from apps.routes.v1.recommender.utils import extract_similar_objects + +ns = Namespace("api/v1/recommender", "Get similar objects via Fink recommender system") + +ARGS = ns.model( + "objects", + { + "objectId": fields.String( + description="ZTF Object ID", + example="ZTF21abfmbix", + required=True, + ), + "classifier": fields.String( + description="Edge classifier to use: FINK_PORTAL or FEATURES", + example="FINK_PORTAL", + required=True, + ), + "n": fields.Integer( + description="N closest objects (upper bound). Default is 10", + example=10, + required=False, + ), + "output-format": fields.String( + description="Output format among json[default], csv, parquet, votable.", + example="json", + required=False, + ), + }, +) + + +@ns.route("") +@ns.doc(params={k: ARGS[k].description for k in ARGS}) +class Recommender(Resource): + def get(self): + """Retrieve similar objects via Fink recommender system""" + payload = request.args + if len(payload) > 0: + # POST from query URL + return self.post() + else: + return Response(ns.description, 200) + + @ns.expect(ARGS, location="json", as_dict=True) + def post(self): + """Retrieve similar objects via Fink recommender system""" + # get payload from the query URL + payload = request.args + + if payload is None or len(payload) == 0: + # if no payload, try the JSON blob + payload = request.json + + rep = check_args(ARGS, payload) + if rep["status"] != "ok": + return Response(str(rep), 400) + + out = extract_similar_objects(payload) + + # Error propagation + if isinstance(out, Response): + return out + + output_format = payload.get("output-format", "json") + return send_tabular_data(out, output_format) diff --git a/apps/routes/v1/recommender/profiling.py b/apps/routes/v1/recommender/profiling.py new file mode 100644 index 0000000..35ad859 --- /dev/null +++ b/apps/routes/v1/recommender/profiling.py @@ -0,0 +1,23 @@ +# Copyright 2025 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Call extract_similar_objects""" + +from apps.routes.v1.recommender.utils import extract_similar_objects + +payload = { + "objectId": "ZTF21abfmbix", +} + +extract_similar_objects(payload) diff --git a/apps/routes/v1/recommender/test.py b/apps/routes/v1/recommender/test.py new file mode 100644 index 0000000..04d89c5 --- /dev/null +++ b/apps/routes/v1/recommender/test.py @@ -0,0 +1,90 @@ +# Copyright 2025 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import requests +import pandas as pd + +import io +import sys + +APIURL = sys.argv[1] + +# Implement random name generator +OID = "ZTF21abfmbix" + + +def get_recommendation( + oid="ZTF21abfmbix", + classifier="FINK_CLASS", + nobjects=10, + output_format="json", +): + """Get recommendation from the classification graph using the Fink REST API""" + payload = { + "objectId": oid, + "classifier": classifier, + "n": nobjects, + "output-format": output_format, + } + + r = requests.post("{}/api/v1/recommender".format(APIURL), json=payload) + + assert r.status_code == 200, r.content + + if output_format == "json": + # Format output in a DataFrame + pdf = pd.read_json(io.BytesIO(r.content)) + elif output_format == "csv": + pdf = pd.read_csv(io.BytesIO(r.content)) + elif output_format == "parquet": + pdf = pd.read_parquet(io.BytesIO(r.content)) + + return pdf + + +def test_object() -> None: + """ + Examples + -------- + >>> test_object() + """ + pdf = get_recommendation(oid=OID) + + assert not pdf.empty + + assert len(pdf) == 10, len(pdf) + + +def test_stability() -> None: + """ + Examples + -------- + >>> test_n_object() + """ + pdf1 = get_recommendation(oid=OID, n=20) + pdf2 = get_recommendation(oid=OID, n=10) + + assert len(pdf1) == 20, len(pdf1) + assert len(pdf2) == 10, len(pdf2) + + # first 10 must be the same + assert pdf1.head(10).equals(pdf2), (pdf1, pdf2) + + +if __name__ == "__main__": + """ Execute the test suite """ + import sys + import doctest + + sys.exit(doctest.testmod()[0]) diff --git a/apps/routes/v1/recommender/utils.py b/apps/routes/v1/recommender/utils.py new file mode 100644 index 0000000..e7d9c49 --- /dev/null +++ b/apps/routes/v1/recommender/utils.py @@ -0,0 +1,79 @@ +# Copyright 2025 AstroLab Software +# Author: Julien Peloton +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pandas as pd +import requests + +from apps.utils.client import connect_to_graph + +from line_profiler import profile + + +@profile +def extract_similar_objects(payload: dict) -> pd.DataFrame: + """Extract similar objects returned by JanusGraph and format it in a Pandas dataframe + + Data is from /api/v1/recommender + + Parameters + ---------- + payload: dict + See https://api.fink-portal.org + + Return + ---------- + out: pandas dataframe + """ + if "n" not in payload: + nobjects = 10 + else: + nobjects = int(payload["n"]) + + if "classifier" not in payload: + classifier_name = "FINK_PORTAL" + else: + classifier_name = payload["classifier"] + + gr, classifiers = connect_to_graph() + + # Classify source + func = getattr(classifiers, classifier_name) + gr.classifySource( + func, + payload["objectId"], + None, + False, + None, + ) + + closest_sources = gr.sourceNeighborhood( + payload["objectId"], classifier_name, nobjects + ) + out = {"i:objectId": [], "v:distance": [], "v:classification": []} + for index, (k, _) in enumerate(closest_sources.items()): + oid = k.getKey() + distance = k.getValue() + r = requests.post( + "https://api.fink-portal.org/api/v1/objects", + json={"objectId": oid, "output-format": "json"}, + ) + out["v:classification"].append(r.json()[0]["v:classification"]) + out["i:objectId"].append(oid) + out["v:distance"].append(distance) + + pdf = pd.DataFrame(out) + + gr.close() + + return pdf diff --git a/apps/utils/client.py b/apps/utils/client.py index 3699119..7e07dac 100644 --- a/apps/utils/client.py +++ b/apps/utils/client.py @@ -58,6 +58,25 @@ def connect_to_hbase_table( return client +@profile +def connect_to_graph(tablename="ztf"): + """Return a client connected to a graph""" + config = extract_configuration("config.yml") + gateway = JavaGateway(auto_convert=True) + + jc = gateway.jvm.com.Lomikel.Januser.JanusClient(config["PROPERTIES"]) + + # TODO: add definition of IP/PORT/TABLE/SCHEMA here in new version of client + gr = gateway.jvm.com.astrolabsoftware.FinkBrowser.Januser.FinkGremlinRecipiesG(jc) + gr.fhclient( + "{}:{}:{}:{}".format( + config["HBASEIP"], config["ZOOPORT"], tablename, config["SCHEMAVER"] + ), + ) + + return gr, gateway.jvm.com.astrolabsoftware.FinkBrowser.Januser.Classifiers + + @profile def create_or_update_hbase_table( tablename: str, diff --git a/bin/download_client.sh b/bin/download_client.sh index 7e8c4dc..b291d3e 100755 --- a/bin/download_client.sh +++ b/bin/download_client.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2024 AstroLab Software +# Copyright 2024-2025 AstroLab Software # Author: Julien Peloton # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,10 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. BASEURL=https://hrivnac.web.cern.ch/Activities/Packages/Lomikel -CLIENTVERSION=03.04.00x +CLIENTVERSION=03.06.00 - -files="Lomikel-$CLIENTVERSION-ext.jar Lomikel-$CLIENTVERSION.exe.jar Lomikel-$CLIENTVERSION-HBase.jar Lomikel-$CLIENTVERSION-HBase.exe.jar Lomikel-$CLIENTVERSION.jar" +files="Lomikel-$CLIENTVERSION-ext.jar Lomikel-$CLIENTVERSION.exe.jar Lomikel-$CLIENTVERSION-HBase.jar Lomikel-$CLIENTVERSION-Janus.jar Lomikel-$CLIENTVERSION-Janus.exe.jar Lomikel-$CLIENTVERSION-HBase.exe.jar Lomikel-$CLIENTVERSION.jar" for file in $files; do echo $file diff --git a/conf/IJCLab.properties.template b/conf/IJCLab.properties.template new file mode 100644 index 0000000..2d1b0d7 --- /dev/null +++ b/conf/IJCLab.properties.template @@ -0,0 +1,31 @@ +storage.backend=hbase +storage.hostname=# fillme +storage.port=# fillme +storage.hbase.table=janusgraph + +cache.db-cache=true +cache.db-cache-clean-wait=20 +cache.db-cache-time=180000 +cache.db-cache-size=0.5 + +index.search.backend=elasticsearch +index.search.hostname=# fillme +index.search.elasticsearch.client-only=false +index.search.elasticsearch.local-mode=true +index.search.elasticsearch.bulk-refresh=true +#index.search.elasticsearch.health-request-timeout=60s +#index.search.elasticsearch.refresh_interval=30 +#index.search.elasticsearch.requestTimeout=60 +#index.search.elasticsearch.max_retries=10 +#index.search.elasticsearch.retry_on_timeout=true + +gremlin.graph=org.janusgraph.core.JanusGraphFactory + +backend.hbase.table=ztf +backend.hbase.port=# fillme +backend.hbase.schema=# fillme +backend.rowkey.name=rowkey + +backend.phoenix.url= +backend.phoenix.proxy.hostname= +backend.phoenix.proxy.port= diff --git a/config.yml b/config.yml index 033aa5e..1efc622 100644 --- a/config.yml +++ b/config.yml @@ -21,3 +21,6 @@ NLIMIT: 10000 WEBHDFS: USER: NAMENODE: + +# Graph +PROPERTIES: conf/IJCLab.properties