diff --git a/examples/load_graph_archngv.py b/examples/load_graph_archngv.py index 73742b7..142cb43 100755 --- a/examples/load_graph_archngv.py +++ b/examples/load_graph_archngv.py @@ -1,14 +1,24 @@ +#!/usr/bin/env python +# coding: utf-8 + import argparse +import base64 +import getpass +import glob import multiprocessing import pickle from functools import partial +from os import environ from pathlib import Path import numpy as np import pandas as pd import psutil +import requests from archngv import NGVCircuit from joblib import Parallel, delayed, parallel_config +from kgforge.core import KnowledgeGraphForge, Resource +from kgforge.specializations.resources import Dataset from tqdm import tqdm from astrovascpy import bloodflow @@ -16,6 +26,91 @@ from astrovascpy.utils import Graph +def get_nexus_token( + client_id="bbp-molsys-sa", + environ_name="KCS", + nexus_url="https://bbpauth.epfl.ch/auth/realms/BBP/protocol/openid-connect/token", +): + """ + retrieve a Nexus Token from keycloak + param: + client_id(str): the keycloak client id + environ_name(str): the name of the environment variable that holds the keycloak secret + nexus_url(str) + """ + try: + client_secret = environ[environ_name] + + # bbp keycloack token endpoint + url = nexus_url + + payload = "grant_type=client_credentials&scope=openid" + authorization = base64.b64encode(f"{client_id}:{client_secret}".encode("utf-8")).decode( + "ascii" + ) + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "Authorization": f"Basic {authorization}", + } + + # request the token + r = requests.request( + "POST", + url=url, + headers=headers, + data=payload, + ) + + # get access token + mexus_token = r.json()["access_token"] + return mexus_token + except Exception as error: + print(f"Error: {error}") + return None + + +def get_nexus_circuit_conf( + circuit_name, nexus_org="bbp", nexus_project="mmb-neocortical-regions-ngv" +): + """ + retrieve nexus NGV config entry + param: + circuit_name(str): the Nexus name of the NGV circuit to load + nexus_org(str): The Nexus organisation + nexus_projec(str): The Nexus project that holds the circuit + """ + + nexus_token = get_nexus_token() + if nexus_token is None: + print("Error: Cannot get a valid Nexus token") + return None + + nexus_endpoint = "https://bbp.epfl.ch/nexus/v1" # production environment + + forge = KnowledgeGraphForge( + "https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml", + endpoint=nexus_endpoint, + bucket=f"{nexus_org}/{nexus_project}", + token=nexus_token, + debug=True, + ) + + p = forge.paths("Dataset") + resources = forge.search(p.type == "DetailedCircuit", p.name == circuit_name, limit=30) + + forge.as_dataframe(resources) + if len(resources) != 1: + print("There are several NGV circuit with this name") + return None + else: + circuit = resources[0] + circuitConfigPath = circuit.circuitConfigPath.url + circuitConfigPath = circuitConfigPath[len("file://") :] + print(f"circuitConfigPath: {circuitConfigPath}") + + return circuitConfigPath + + def load_graph_archngv_parallel( filename, n_workers, n_astro=None, parallelization_backend="multiprocessing" ): @@ -63,7 +158,11 @@ def load_graph_archngv_parallel( with multiprocessing.Pool(n_workers) as pool: for result_ids, result_endfeet in zip( tqdm( - pool.imap(worker, args, chunksize=max(1, int(len(endfoot_ids) / n_workers))), + pool.imap( + worker, + args, + chunksize=max(1, int(len(endfoot_ids) / n_workers)), + ), total=len(endfoot_ids), ), endfoot_ids, @@ -75,7 +174,10 @@ def load_graph_archngv_parallel( elif parallelization_backend == "joblib": with parallel_config( - backend="loky", prefer="processes", n_jobs=n_workers, inner_max_num_threads=1 + backend="loky", + prefer="processes", + n_jobs=n_workers, + inner_max_num_threads=1, ): parallel = Parallel(return_as="generator", batch_size="auto") parallelized_region = parallel( @@ -101,15 +203,30 @@ def main(): print = partial(print, flush=True) parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.") + parser.add_argument("--circuit-name", type=str, required=False, help="NGV circuits nexus name") + parser.add_argument("--circuit-path", type=str, required=False, help="Path to the NGV circuits") parser.add_argument( - "--filename_ngv", type=str, required=True, help="Path to the NGV circuits file" - ) - parser.add_argument( - "--output_graph", type=str, required=True, help="Path to the output graph file" + "--output-graph", type=str, required=True, help="Path to the output graph file" ) args = parser.parse_args() - filename_ngv = args.filename_ngv + if args.circuit_name is not None: + circuit_name = args.circuit_name + filename_ngv = get_nexus_circuit_conf(circuit_name) + if filename_ngv is None: + print("Error: Could not obtain a valid file path for the NGV circuit") + return -1 + + elif args.circuit_path is not None: + filename_ngv = args.circuit_path + # filename_ngv = args.filename_ngv + + else: + print("ERROR: circuit-name or circuit-path must be provided") + return -1 + + print(f"INFO: filename_ngv {filename_ngv} ") + output_graph = args.output_graph n_cores = psutil.cpu_count(logical=False) @@ -123,11 +240,12 @@ def main(): print("loading circuit : finish") print("pickle graph : start") - filehandler = open(output_graph, "wb") - pickle.dump(graph, filehandler) - print("pickle graph : finish") + with open(output_graph, "wb") as filehandler: + pickle.dump(graph, filehandler) + print("pickle graph : finish") print(f"Graph file: {output_graph}") if __name__ == "__main__": + print("INFO: Start load_graph_archngv.py") main() diff --git a/examples/load_graph_archngv.sbatch b/examples/load_graph_archngv.sbatch index d6750ae..91bcd32 100644 --- a/examples/load_graph_archngv.sbatch +++ b/examples/load_graph_archngv.sbatch @@ -3,7 +3,7 @@ #SBATCH --job-name="archngv" #SBATCH --nodes=1 -#SBATCH --account=proj16 +#SBATCH --account=proj137 #SBATCH --partition=prod #SBATCH --constraint=cpu #SBATCH --time=00:30:00 @@ -15,6 +15,8 @@ JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}') JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT}) +JOB_SCRIPT_DIR='/gpfs/bbp.cscs.ch/data/scratch/proj137/jacquemi/Nexus' + SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh" if [[ ! -f ${SETUP_SCRIPT} ]]; then @@ -24,13 +26,23 @@ fi source ${SETUP_SCRIPT} -FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1" +FILENAME_NGV="" GRAPH_PATH="./data/graphs_folder/dumped_graph.bin" -echo -echo "### Loading graph" -echo +#CIRCUIT_NAME="tiny_real_spine_morph" +CIRCUIT_NAME="NGV O1.v5 (Rat)" + # It is imperative to use srun and dplace, otherwise the Python processes # do not work properly (possible deadlocks and/or performance degradation) -time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --filename_ngv ${FILENAME_NGV} --output_graph ${GRAPH_PATH} +if [[ -z $FILENAME_NGV ]]; then + echo "### Loading graph from name: $CIRCUIT_NAME from Nexus" + echo "Execute ${JOB_SCRIPT_DIR}/load_graph_archngv.py with srun" + time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --circuit-name "${CIRCUIT_NAME}" --output-graph ${GRAPH_PATH} +else + echo "### Loading graph from filename: $FILENAME_NGV" + echo "Execute ${JOB_SCRIPT_DIR}/load_graph_archngv.py with srun" + time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --circuit-path "${FILENAME_NGV}" --output-graph ${GRAPH_PATH} +fi + +echo diff --git a/examples/load_graph_archngv_nexus.py b/examples/load_graph_archngv_nexus.py new file mode 100644 index 0000000..50ccbde --- /dev/null +++ b/examples/load_graph_archngv_nexus.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# coding: utf-8 + +import argparse +import getpass +import glob +import multiprocessing +import pickle +from functools import partial +from pathlib import Path + +import numpy as np +import pandas as pd +import psutil +from archngv import NGVCircuit +from joblib import Parallel, delayed, parallel_config +from kgforge.core import KnowledgeGraphForge, Resource +from kgforge.specializations.resources import Dataset +from tqdm import tqdm + +from astrovascpy import bloodflow +from astrovascpy.exceptions import BloodFlowError +from astrovascpy.utils import Graph + + +def get_circuit_conf(circuit_name): + """ + retrieve nexus NGV config entry + param: + circuit_name(str): the nexus name of the NGV circuit to load" + """ + + TOKEN = getpass.getpass() + + nexus_endpoint = "https://bbp.epfl.ch/nexus/v1" # production environment + + ORG = "bbp" + PROJECT = "mmb-neocortical-regions-ngv" + + forge = KnowledgeGraphForge( + "https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/notebooks/use-cases/prod-forge-nexus.yml", + endpoint=nexus_endpoint, + bucket=f"{ORG}/{PROJECT}", + token=TOKEN, + debug=True, + ) + + p = forge.paths("Dataset") + resources = forge.search(p.type == "DetailedCircuit", p.name == "NGV O1.v5 (Rat)", limit=30) + + forge.as_dataframe(resources) + if len(resources) != 1: + print("There are several NGV circuit with this name") + return None + else: + circuit = resources[0] + circuitConfigPath = circuit.circuitConfigPath.url + circuitConfigPath = circuitConfigPath[len("file://") :] + print(f"circuitConfigPath: {circuitConfigPath}") + + return circuitConfigPath + + +def load_graph_archngv_parallel( + filename, n_workers, n_astro=None, parallelization_backend="multiprocessing" +): + """Load a vasculature from an NGV circuit. + + Args: + filename (str): vasculature dataset. + n_workers (int): number of processes to set endfeet on edges. + n_astro (int): for testing, if not None, it will reduce the number of astrocytes used + parallelization_backend (str): Either multiprocessing or joblib + + Returns: + vasculatureAPI.PointVasculature: graph containing point vasculature skeleton. + + Raises: + BloodFlowError: if the file object identified by filename is not in h5 format. + """ + if not Path(filename).exists(): + raise BloodFlowError("File provided does not exist") + circuit = NGVCircuit(filename) + pv = circuit.vasculature.point_graph + graph = Graph.from_point_vasculature(pv) + graph.edge_properties.index = pd.MultiIndex.from_frame( + graph.edge_properties.loc[:, ["section_id", "segment_id"]] + ) + gv_conn = circuit.gliovascular_connectome + worker = partial(bloodflow.get_closest_edges, graph=graph) + + args = ( + ( + gv_conn.vasculature_sections_segments(endfoot_id).vasculature_section_id.values[0], + gv_conn.vasculature_sections_segments(endfoot_id).vasculature_segment_id.values[0], + gv_conn.get(endfoot_id, ["endfoot_compartment_length"]).values[0], + ) + for astro_id in np.arange(n_astro or circuit.astrocytes.size) + for endfoot_id in gv_conn.astrocyte_endfeet(astro_id) + ) + endfoot_ids = [ + endfoot_id + for astro_id in np.arange(n_astro or circuit.astrocytes.size) + for endfoot_id in gv_conn.astrocyte_endfeet(astro_id) + ] + + if parallelization_backend == "multiprocessing": + with multiprocessing.Pool(n_workers) as pool: + for result_ids, result_endfeet in zip( + tqdm( + pool.imap(worker, args, chunksize=max(1, int(len(endfoot_ids) / n_workers))), + total=len(endfoot_ids), + ), + endfoot_ids, + ): + # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data + graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( + result_endfeet + ) + + elif parallelization_backend == "joblib": + with parallel_config( + backend="loky", prefer="processes", n_jobs=n_workers, inner_max_num_threads=1 + ): + parallel = Parallel(return_as="generator", batch_size="auto") + parallelized_region = parallel( + delayed(worker)(arg) for arg in tqdm(args, total=len(endfoot_ids)) + ) + + for result_ids, result_endfeet in zip(parallelized_region, endfoot_ids): + # Only the main process executes this part, i.e. as soon as it receives the parallelly generated data + graph.edge_properties.loc[pd.MultiIndex.from_arrays(result_ids.T), "endfeet_id"] = ( + result_endfeet + ) + + else: + raise BloodFlowError( + f"parallelization_backend={parallelization_backend} invalid option. Use 'joblib' or 'multiprocessing'." + ) + + return graph + + +def main(): + global print + print = partial(print, flush=True) + + parser = argparse.ArgumentParser(description="File paths for NGVCircuits and output graph.") + parser.add_argument("--circuit_name", type=str, required=True, help="NGV circuits nexus name") + parser.add_argument( + "--output_graph", type=str, required=True, help="Path to the output graph file" + ) + args = parser.parse_args() + + circuit_name = args.circuit_name + # filename_ngv = args.filename_ngv + filename_ngv = get_circuit_conf(circuit_name) + + output_graph = args.output_graph + + n_cores = psutil.cpu_count(logical=False) + print(f"number of physical CPU cores = {n_cores}") + + print(f"NGV Circuits file: {filename_ngv}") + print("loading circuit : start") + graph = load_graph_archngv_parallel( + filename_ngv, n_workers=n_cores + ) # n_astro=50 for debugging (smaller processing needs) + print("loading circuit : finish") + + print("pickle graph : start") + filehandler = open(output_graph, "wb") + pickle.dump(graph, filehandler) + print("pickle graph : finish") + print(f"Graph file: {output_graph}") + + +if __name__ == "__main__": + main() diff --git a/examples/load_graph_archngv_nexus.sbatch b/examples/load_graph_archngv_nexus.sbatch new file mode 100755 index 0000000..3a59a28 --- /dev/null +++ b/examples/load_graph_archngv_nexus.sbatch @@ -0,0 +1,38 @@ +#!/bin/bash + +#SBATCH --job-name="archngv" +#SBATCH --nodes=1 + +#SBATCH --account=proj137 +#SBATCH --partition=prod +#SBATCH --constraint=cpu +#SBATCH --time=00:40:00 + + +#JOB_SCRIPT=$(scontrol show job ${SLURM_JOB_ID} | awk -F= '/Command=/{print $2}') +#JOB_SCRIPT_DIR=$(dirname ${JOB_SCRIPT}) +#JOB_SCRIPT_DIR=/gpfs/bbp.cscs.ch/project/proj137/scratch/jacquemi/Nexus + +SETUP_SCRIPT="${JOB_SCRIPT_DIR}/../setup.sh" +SETUP_SCRIPT="./setup.sh" +if [[ ! -f ${SETUP_SCRIPT} ]]; then + >&2 echo "[ERROR] The 'setup.sh' script could not be found!" + exit 2 +fi + +#source ${SETUP_SCRIPT} + +#FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1" +#FILENAME_NGV="/gpfs/bbp.cscs.ch/project/proj137/NGVCircuits/rat_O1/build/ngv_config.json" +CIRCUIT_NAME="NGV O1.v5 (Rat)" + + +GRAPH_PATH="./data/graphs_folder/dumped_graph_nexus.bin" + +echo +echo "### Loading graph" +echo +# It is imperative to use srun and dplace, otherwise the Python processes +# do not work properly (possible deadlocks and/or performance degradation) +#time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv.py --circuit_name ${CIRCUIT_NAME} --output_graph ${GRAPH_PATH} +time srun -n 1 --mpi=none dplace python ${JOB_SCRIPT_DIR}/load_graph_archngv_nexus.py --circuit_name "${CIRCUIT_NAME}" --output_graph ${GRAPH_PATH} diff --git a/setup.sh b/setup.sh index d666bba..5955b93 100755 --- a/setup.sh +++ b/setup.sh @@ -28,7 +28,7 @@ else conda install -y pip conda install -y -c conda-forge mpi mpi4py petsc petsc4py - "$CONDA_PREFIX/bin/pip" install tox joblib archngv + "$CONDA_PREFIX/bin/pip" install tox joblib archngv nexusforge # If complex number support is needed #conda install -y -c conda-forge mpi mpi4py "petsc=*=*complex*" "petsc4py=*=*complex*" fi @@ -65,7 +65,7 @@ then python3 -m pip install --upgrade pip fi pip3 install -e ${SETUP_DIR} - pip3 install tox joblib archngv + pip3 install tox joblib archngv nexusforge else conda_bin=`conda info | grep "active env location" | grep -o "/.*"`/bin $conda_bin/pip install -e ${SETUP_DIR}