diff --git a/.kokoro/docker/Dockerfile b/.kokoro/docker/Dockerfile index e2d74d172dc..6ba09ead867 100644 --- a/.kokoro/docker/Dockerfile +++ b/.kokoro/docker/Dockerfile @@ -46,6 +46,7 @@ RUN apt-get update \ liblzma-dev \ libmagickwand-dev \ libmemcached-dev \ + libpcsclite-dev \ libpython3-dev \ libreadline-dev \ libsnappy-dev \ diff --git a/.kokoro/tests/run_tests.sh b/.kokoro/tests/run_tests.sh index 1715decdce7..dc01559a106 100755 --- a/.kokoro/tests/run_tests.sh +++ b/.kokoro/tests/run_tests.sh @@ -84,6 +84,9 @@ fi cd "${PROJECT_ROOT}" +# # add libpcsclite +# sudo apt install -q -y -s libpcsclite-dev + # add user's pip binary path to PATH export PATH="${HOME}/.local/bin:${PATH}" diff --git a/kms/singletenanthsm/README.rst b/kms/singletenanthsm/README.rst new file mode 100644 index 00000000000..7a252415f92 --- /dev/null +++ b/kms/singletenanthsm/README.rst @@ -0,0 +1,120 @@ +Google Cloud Key Management Service Python Samples +=============================================================================== + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=kms/singletenanthsm/README.rst + + +This directory contains samples for Google Cloud Key Management Service. The `Cloud Key Management Service`_ allows you to create, import, and manage cryptographic keys and perform cryptographic operations in a single centralized cloud service. + + + + +.. _Cloud Key Management Service: https://cloud.google.com/kms/docs/ + + + + + +Setup +------------------------------------------------------------------------------- + + +Install Dependencies +++++++++++++++++++++ + +#. Clone python-kms and change directory to the sample directory you want to use. + + .. code-block:: bash + + $ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + +#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions. + + .. _Python Development Environment Setup Guide: + https://cloud.google.com/python/setup + +#. Install `libpcsclite-dev` if you do not already have it. + + .. code-block:: bash + + $ sudo apt install libpcsclite-dev + +#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+. + + .. code-block:: bash + + $ virtualenv env + $ source env/bin/activate + +#. Install the dependencies needed to run the samples. + + .. code-block:: bash + + $ pip install -r requirements.txt + +.. _pip: https://pip.pypa.io/ +.. _virtualenv: https://virtualenv.pypa.io/ + +Samples +------------------------------------------------------------------------------- + +Create a custom gcloud build to access the Single Tenant HSM service. ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + + +This application creates a custom gcloud build to access the single tenant HSM service. The operation can be specified depending if the user +wants to just generate rsa keys on all connected yubikeys(generate_rsa_keys), just generate the custom gcloud build to access the +single-tenant-hsm(build_custom_gcloud), or both generate keys and the custom gcloud build(generate_gcloud_and_keys). Yubikeys will need to be connected +to run the `generate_rsa_keys` and `generate_gcloud_and_keys` operations. + + +.. code-block:: bash + + $ python3 setup.py + + usage: setup.py [-h] [--operation] {build_custom_gcloud,generate_rsa_keys,generate_gcloud_and_keys} + + positional arguments: + operation The type of setup operation you want to perform. This includes build_custom_gcloud','generate_rsa_keys','generate_gcloud_and_keys'. + + optional arguments: + -h, --help show this help message and exit + + # Below is an example of using the setup command to generate rsa private keys and the custom gcloud build: + + $ python3 setup.py --operation=generate_gcloud_and_keys + + + + +Approves a Single Tenant HSM Instance Proposal. ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +To run this sample: + +.. code-block:: bash + + $ python3 approve_proposal.py + + usage: approve_proposal.py [-h] [--proposal_resource PROPOSAL_RESOURCE] + + This application fetches and approves the single tenant HSM instance proposal + specified in the "proposal_resource" field. + + For more information, visit https://cloud.google.com/kms/docs/attest-key. + + positional arguments: + --proposal_resource PROPOSAL_RESOURCE + The full name of the single tenant HSM instance proposal that needs to be approved. + + optional arguments: + -h, --help show this help message and exit + + # Below is an example of using the approve script to fetch the challenges, sign the challenges, and send the signed challenges + # associated with the proposal 'my_proposal': + + $ python3 approve_proposal.py --proposal_resource=projects/my-project/locations/us-east1/singleTenantHsmInstances/mysthi/proposals/my_proposal + + + +.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/kms/singletenanthsm/approve_proposal.py b/kms/singletenanthsm/approve_proposal.py new file mode 100644 index 00000000000..63c6b940e22 --- /dev/null +++ b/kms/singletenanthsm/approve_proposal.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import logging +import os +import sys +from typing import List + +import gcloud_commands +import ykman_utils + + +def make_directory(directory_path: str) -> None: + """Creates a directory with the passed in path if it does not already exist. + + Args: + directory_path: The path of the directory to be created. + + Returns: + None + """ + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + logger.info("Parsing challenges into files") + if not os.path.exists(directory_path): + os.mkdir(directory_path) + logger.info(f"Directory '{directory_path}' created.") + else: + logger.info(f"Directory '{directory_path}' already exists.") + + +def parse_challenges_into_files(sthi_output: str) -> List[bytes]: + """Parses the STHI output and writes the challenges and public keys to files. + + Args: + sthi_output: The output of the STHI command. + + Returns: + A list of the unsigned challenges. + """ + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + logger.info("Parsing challenges into files") + proposal_json = json.loads(sthi_output, strict=False) + challenges = proposal_json["quorumParameters"]["challenges"] + + make_directory("challenges") + + challenge_count = 0 + unsigned_challenges = [] + for challenge in challenges: + challenge_count += 1 + try: + with open("challenges/challenge{0}.txt".format(challenge_count), "wb") as f: + binary_challenge = ykman_utils.urlsafe_base64_to_binary( + challenge["challenge"] + ) + f.write(binary_challenge) + except FileNotFoundError: + logger.exception( + f"File not found: challenges/challenge{challenge_count}.txt" + ) + except Exception as e: + logger.exception(f"An error occurred: {e}") + try: + with open("challenges/public_key{0}.pem".format(challenge_count), "w") as f: + f.write( + challenge["publicKeyPem"].encode("utf-8").decode("unicode_escape") + ) + except FileNotFoundError: + logger.exception( + f"File not found: challenges/public_key{challenge_count}.txt" + ) + except Exception as e: + logger.exception(f"An error occurred: {e}") + unsigned_challenges.append( + ykman_utils.Challenge(binary_challenge, challenge["publicKeyPem"]) + ) + + return unsigned_challenges + + +def parse_args(args): + parser = argparse.ArgumentParser() + parser.add_argument("--proposal_resource", type=str, required=True) + parser.add_argument( + "--management_key", + type=str, + required=False, + ) + parser.add_argument( + "--pin", + type=str, + required=False, + ) + return parser.parse_args(args) + + +def signed_challenges_to_files( + challenge_replies: list[ykman_utils.ChallengeReply], +) -> None: + """Writes the signed challenges and public keys to files. + + Args: + challenge_replies: A list of ChallengeReply objects. + + Returns: + None + """ + signed_challenge_files = [] + challenge_count = 0 + + for challenge_reply in challenge_replies: + challenge_count += 1 + make_directory("signed_challenges") + with open( + f"signed_challenges/public_key_{challenge_count}.pem", "w" + ) as public_key_file: + + # Write public key to file + public_key_file.write(challenge_reply.public_key_pem) + with open( + f"signed_challenges/signed_challenge{challenge_count}.bin", "wb" + ) as binary_file: + + # Write signed challenge to file + binary_file.write(challenge_reply.signed_challenge) + signed_challenge_files.append( + ( + f"signed_challenges/signed_challenge{challenge_count}.bin", + f"signed_challenges/public_key_{challenge_count}.pem", + ) + ) + return signed_challenge_files + + +def approve_proposal(): + """Approves a proposal by fetching challenges, signing them, and sending them back to gcloud.""" + parser = parse_args(sys.argv[1:]) + + # Fetch challenges + process = gcloud_commands.fetch_challenges(parser.proposal_resource) + + # Parse challenges into files + unsigned_challenges = parse_challenges_into_files(process.stdout) + + # Sign challenges + signed_challenges = ykman_utils.sign_challenges( + challenges=unsigned_challenges, + management_key=parser.management_key, + pin=parser.pin, + ) + + # Parse signed challenges into files + signed_challenged_files = signed_challenges_to_files(signed_challenges) + + # Return signed challenges to gcloud + gcloud_commands.send_signed_challenges( + signed_challenged_files, parser.proposal_resource + ) + + +if __name__ == "__main__": + approve_proposal() diff --git a/kms/singletenanthsm/approve_proposal_test.py b/kms/singletenanthsm/approve_proposal_test.py new file mode 100644 index 00000000000..8808d037496 --- /dev/null +++ b/kms/singletenanthsm/approve_proposal_test.py @@ -0,0 +1,247 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from dataclasses import dataclass + +import json + +import os + +from cryptography.hazmat.primitives.serialization import ( + Encoding, + PublicFormat, +) + +import approve_proposal + +import ykman_fake +import ykman_utils + + +sample_sthi_output = """ +{ + "quorumParameters": { + "challenges": [ + { + "challenge": "tiOz64M_rJ34yOvweHBBltRrm3k34bou4m2JKlz9BmhrR7yU6S6ram8o1VQhyPU1", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3WK/NpZ4DJ68lOR7JINL\nyODwrRanATJNepJi1LYDDO4ZqQvaOvbv8RR47YBlHYAwEDuUC0Vy9g03T0G7V/TV\nTFNQU+I2wIm6VQFFbhjFYYCECILHPNwRp8XN0VKSiTqj5ilPa2wdPsBEgwNKlILn\nv9iTx9IdyFeMmCqIWgeFX5sHddvgq5Dep7kBRVh7ZM1+hOS8kw2qmZgKX8Zwgz3E\n0En/2r+3YgWtMxTz6iqW/Op0UagrlR5EgysjrNgakJEJQA/x23SataJOpVvSE9pH\nSCyzrIaseg1gtz5huDVO5GOK3Xg/VUr2n3sk98MQtHWWaEfcpstSrrefjTC4IYN5\n2QIDAQAB\n-----END PUBLIC KEY-----\n" + }, + { + "challenge": "6bfZOoD9L35qO1GIzVHcv9sX0UEzKCTru8yz1U7NK4o7y0gnXoU3Ak47sFFY4Yzb", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwpxT5iX72pkd/m8Fb3mg\nMkCQMoWb3FKAjHsutKpUEMA0ts1atZe7WFBRcCxV2mTDeWFpSwWjuYYSNNrEgk9e\nBRiLJ/36hCewnzw9PZMPcnWv+QLbyLsr4jAEVHk2pWln2HkVbAmK2OWEhvlUjxyT\nfB0b1UsBP3uy5f+SLb8iltvwWZGauT64JrLpbIwhk6SbXOCZSZtsXVZ5mVPEIxik\nZ4iBT3r+9Fc3fgKN/16bjdHw+qbWxovEYejG10Yp1yO4QjSzkxQsXTFvsWxaTKF2\ncZa5GF19b9ZkY3SRxHF6emA720F+N4oeGuV0Zu/ACYfMqRUSkh5GiOpv6VxvuXRD\n0wIDAQAB\n-----END PUBLIC KEY-----\n" + }, + { + "challenge": "NNH3Pt3F-OvaeYR_Dynp_nbHMuLaVYBnkG7uJtwz2-lShyLaHNjOyjBnL-eGjoRY", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsrrPGkxbk08x5CpkUk5y\nfWBmfiE4qU4IWaSO9HCBv5uRWJvDqXkKjkcBptwmGFsnzT+owfSe+21nWLOLZqwW\nmPbV0bW3e7l3ZUw/4fUga+KJDR5OfkkXWSos1cEMhxsSMnGykhx2/ge9bqY0Edbr\nzckOT2un87ThdawveS3hOxTczE+JcgzoI+CUxlPV0c9yJ5iNFZXf1p7wj3Rq2I8X\nAl4XyMP/+0TLR5+UTrrxLC4ds4m9EjMPRv4aNJFqzBfb3WBM/DFVvNR82Mt2pfF8\nlv6RyZU/vls6vjDl42NK3hckOhEGqQpPmifKgPCaOwdLHg68CjQZ54GWGqyFGzNx\nHwIDAQAB\n-----END PUBLIC KEY-----\n" + } + ], + "requiredApproverCount": 3 + } +} + +""" + + +sample_nonces = [ + "NNH3Pt3F-OvaeYR_Dynp_nbHMuLaVYBnkG7uJtwz2-lShyLaHNjOyjBnL-eGjoRY", + "tiOz64M_rJ34yOvweHBBltRrm3k34bou4m2JKlz9BmhrR7yU6S6ram8o1VQhyPU1", + "6bfZOoD9L35qO1GIzVHcv9sX0UEzKCTru8yz1U7NK4o7y0gnXoU3Ak47sFFY4Yzb", +] + + +@dataclass +class QuorumParameters: + challenges: list[ykman_utils.Challenge] + + +mock_signed_challenges = [] + +test_resource = "projects/my-project/locations/us-east1/singleTenantHsmInstances/mysthi/proposals/my_proposal" +# mock_completed_process = subprocess.CompletedProcess + + +def public_key_to_pem(public_key): + public_key_pem = public_key.public_bytes( + encoding=Encoding.PEM, format=PublicFormat.SubjectPublicKeyInfo + ).decode("utf-8") + print("PUBLIC KEY--------------") + print(public_key_pem) + return public_key_pem + + +def create_json(public_key_pem_1, public_key_pem_2, public_key_pem_3): + + my_json_string = json.dumps( + { + "quorumParameters": { + "challenges": [ + { + "challenge": "tiOz64M_rJ34yOvweHBBltRrm3k34bou4m2JKlz9BmhrR7yU6S6ram8o1VQhyPU1", + "publicKeyPem": public_key_pem_1, + }, + { + "challenge": "6bfZOoD9L35qO1GIzVHcv9sX0UEzKCTru8yz1U7NK4o7y0gnXoU3Ak47sFFY4Yzb", + "publicKeyPem": public_key_pem_2, + }, + { + "challenge": "NNH3Pt3F-OvaeYR_Dynp_nbHMuLaVYBnkG7uJtwz2-lShyLaHNjOyjBnL-eGjoRY", + "publicKeyPem": public_key_pem_3, + }, + ], + "requiredApproverCount": 3, + } + } + ) + return my_json_string + + +def create_fake_fetch_response(num_keys=3): + """ + Generates a fake fetch response with a specified number of RSA key pairs. + + Args: + num_keys: The number of RSA key pairs to generate. + + Returns: + A tuple containing: + - A JSON object with the public keys. + - A dictionary mapping public key PEMs to private keys. + """ + pub_to_priv_key = {} + public_key_pems = [] + + for _ in range(num_keys): + private_key, public_key = ykman_fake.generate_rsa_keys() + public_key_pem = public_key_to_pem(public_key) + pub_to_priv_key[public_key_pem] = private_key + public_key_pems.append(public_key_pem) + + challenge_json = create_json(*public_key_pems) # Use * to unpack the list + return challenge_json, pub_to_priv_key + + +def sign_challenges_with_capture( + challenges: list[ykman_utils.Challenge], pub_to_priv_key +): + signed_challenges = [] + for challenge in challenges: + private_key = pub_to_priv_key[challenge.public_key_pem] + signed_challenge = ykman_fake.sign_data(private_key, challenge.challenge) + signed_challenges.append( + ykman_utils.ChallengeReply( + challenge.challenge, signed_challenge, challenge.public_key_pem + ) + ) + mock_signed_challenges.extend(signed_challenges) + return signed_challenges + + +def verify_with_fake(pub_to_priv_key, signed_challenges): + for signed_challenge in signed_challenges: + priv_key = pub_to_priv_key[signed_challenge.public_key_pem] + assert ykman_fake.verify_signature( + priv_key.public_key(), + signed_challenge.unsigned_challenge, + signed_challenge.signed_challenge, + ) + print("Signed verified successfully") + + +def test_get_challenges_mocked(mocker, monkeypatch): + # Verify signed challenges + monkeypatch.setattr( + "gcloud_commands.send_signed_challenges", + lambda signed_challenges, proposal_resource: verify_with_fake( + pub_to_priv_key, mock_signed_challenges + ), + ) + + # monkeypatch sign challenges + monkeypatch.setattr( + "ykman_utils.sign_challenges", + lambda challenges, **kwargs: sign_challenges_with_capture( + challenges, pub_to_priv_key + ), + ) + + # mock the challenge string returned by service + challenge_json, pub_to_priv_key = create_fake_fetch_response() + mock_response = mocker.MagicMock() + mock_response.stdout = challenge_json + mocker.patch("subprocess.run", return_value=mock_response) + + # monkeypatch parse args + monkeypatch.setattr( + "approve_proposal.parse_args", + lambda args: argparse.Namespace( + proposal_resource="test_resource", + management_key="test_management_key", + pin="123456", + ), + ) + approve_proposal.approve_proposal() + + # assert challenge files created + challenge_files = [ + "challenges/challenge1.txt", + "challenges/challenge2.txt", + "challenges/challenge3.txt", + ] + for file_path in challenge_files: + assert os.path.exists( + file_path + ), f"File '{file_path}' should exist but does not." + + +if __name__ == "__main__": + # Parse challenges into files + unsigned_challenges = approve_proposal.parse_challenges_into_files( + sample_sthi_output + ) + created_signed_files = [ + "signed_challenges/signed_challenge1.txt", + "signed_challenges/signed_challenge2.txt", + "signed_challenges/signed_challenge3.txt", + ] + for file_path in created_signed_files: + assert os.path.exists( + file_path + ), f"File '{file_path}' should exist but does not." + + # assert signed challenge files created + signed_challenge_files = [ + "signed_challenges/signed_challenge1.txt", + "signed_challenges/signed_challenge2.txt", + "signed_challenges/signed_challenge3.txt", + ] + for file_path in signed_challenge_files: + assert os.path.exists( + file_path + ), f"File '{file_path}' should exist but does not." + + # Parse files into challenge list + challenges = ykman_utils.populate_challenges_from_files() + for challenge in challenges: + print(challenge.challenge) + print(challenge.public_key_pem) + unsigned_challenges.append(challenge.challenge) + signed_challenged_files = [] + signed_challenges = ykman_utils.sign_challenges(challenges, signed_challenged_files) + for signed_challenge in signed_challenges: + print(signed_challenge.signed_challenge) + print(signed_challenge.public_key_pem) + print("--challenge_replies=" + str(signed_challenged_files)) + ykman_utils.verify_challenge_signatures(signed_challenges) diff --git a/kms/singletenanthsm/gcloud_commands.py b/kms/singletenanthsm/gcloud_commands.py new file mode 100644 index 00000000000..f39e7b61a3e --- /dev/null +++ b/kms/singletenanthsm/gcloud_commands.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import subprocess + +command_build_custom_gcloud = """ + pushd /tmp + curl -o installer.sh https://sdk.cloud.google.com + chmod +x installer.sh + ./installer.sh --disable-prompts --install-dir ~/sthi + rm installer.sh + popd + alias sthigcloud=~/sthi/google-cloud-sdk/bin/gcloud + sthigcloud auth login + """ + + +command_add_components = """ + ~/sthi/google-cloud-sdk/bin/gcloud components repositories add https://storage.googleapis.com/single-tenant-hsm-private/components-2.json + ~/sthi/google-cloud-sdk/bin/gcloud components update + """ + + +def build_custom_gcloud(): + """Builds a custom gcloud binary.""" + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + try: + print("\nBuilding custom gcloud build") + process = subprocess.run( + command_build_custom_gcloud, + check=True, + shell=True, + capture_output=True, + text=True, + ) + logger.info(f"Return Code: {process.returncode}") + logger.info(f"Standard Error: {process.stderr}") + logger.info("gcloud build executed successfully.") + logger.info(process.stdout) + except subprocess.CalledProcessError as e: + logger.exception(f"gcloud build failed: {e}") + raise subprocess.CalledProcessError(e.returncode, e.cmd, e.output, e.stderr) + try: + print("\nAdding gcloud components") + process = subprocess.run( + command_add_components, + check=True, + shell=True, + capture_output=True, + text=True, + ) + logger.info(f"Return Test: {process}") + logger.info(f"Return Code: {process.returncode}") + logger.info(f"Standard Output: {process.stdout}") + logger.info(f"Standard Error: {process.stderr}") + logger.info("gcloud components add executed successfully.") + logger.info(process.stdout) + return process + except subprocess.CalledProcessError as e: + logger.info(f"Error executing gcloud components update: {e}") + raise subprocess.CalledProcessError(e.returncode, e.cmd, e.output, e.stderr) + + +command_gcloud_list_proposal = ( + "~/sthi/google-cloud-sdk/bin/gcloud kms single-tenant-hsm list " + "--location=projects/hawksbill-playground/locations/global" +) + +command_gcloud_describe_proposal = """ + ~/sthi/google-cloud-sdk/bin/gcloud \ + kms single-tenant-hsm proposal describe """ + + +def fetch_challenges(sthi_proposal_resource: str): + """Fetches challenges from the server.""" + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + try: + print("\nfetching challenges") + process = subprocess.run( + command_gcloud_describe_proposal + + sthi_proposal_resource + + " --format=json", + capture_output=True, + check=True, + text=True, + shell=True, + # stderr=subprocess.STDOUT + ) + logger.info(f"Return Test: {process}") + logger.info(f"Return Code: {process.returncode}") + logger.info(f"Standard Output: {process.stdout}") + logger.info(f"Standard Error: {process.stderr}") + logger.info("gcloud command executed successfully.") + logger.info(process.stdout) + return process + except subprocess.CalledProcessError as e: + logger.exception(f"Fetching challenges failed: {e}") + raise subprocess.CalledProcessError(e.returncode, e.cmd, e.output, e.stderr) + + +command_gcloud_approve_proposal = [ + "~/sthi/google-cloud-sdk/bin/gcloud", + "kms", + "single-tenant-hsm", + "proposal", + "approve", +] + + +def send_signed_challenges(signed_challenged_files: list[str], proposal_resource: str): + """Sends signed challenges to the server.""" + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + if signed_challenged_files is None or not signed_challenged_files: + raise ValueError("signed_challenged_files is empty") + print("Sending signed challenges") + signed_challenge_str = '--challenge_replies="' + str(signed_challenged_files) + '"' + command_str = " ".join( + command_gcloud_approve_proposal + [proposal_resource] + [signed_challenge_str] + ) + logger.info(command_str) + + try: + process = subprocess.run( + command_str, + capture_output=True, + check=True, + text=True, + shell=True, + ) + logger.info(f"Return Test: {process}") + logger.info(f"Return Code: {process.returncode}") + logger.info(f"Standard Output: {process.stdout}") + logger.info(f"Standard Error: {process.stderr}") + logger.info("gcloud command executed successfully.") + return process + + except subprocess.CalledProcessError as e: + logger.exception(f"Sending signed challenges failed: {e}") + raise subprocess.CalledProcessError(e.returncode, e.cmd, e.output, e.stderr) diff --git a/kms/singletenanthsm/gcloud_commands_test.py b/kms/singletenanthsm/gcloud_commands_test.py new file mode 100644 index 00000000000..5b5ca34b67c --- /dev/null +++ b/kms/singletenanthsm/gcloud_commands_test.py @@ -0,0 +1,321 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +from unittest import mock + +import pytest + +import gcloud_commands + + +test_proposal_resource = """projects/test_project/locations/\ +us-central1/singleTenantHsmInstances/my_sthi/proposals/my_proposal + """ +sample_fetch_challenge_output = """ +{ + "quorumParameters": { + "challenges": [ + { + "challenge": "tiOz64M_rJ34yOvweHBBltRrm3k34bou4m2JKlz9BmhrR7yU6S6ram8o1VQhyPU1", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3WK/NpZ4DJ68lOR7JINL\nyODwrRanATJNepJi1LYDDO4ZqQvaOvbv8RR47YBlHYAwEDuUC0Vy9g03T0G7V/TV\nTFNQU+I2wIm6VQFFbhjFYYCECILHPNwRp8XN0VKSiTqj5ilPa2wdPsBEgwNKlILn\nv9iTx9IdyFeMmCqIWgeFX5sHddvgq5Dep7kBRVh7ZM1+hOS8kw2qmZgKX8Zwgz3E\n0En/2r+3YgWtMxTz6iqW/Op0UagrlR5EgysjrNgakJEJQA/x23SataJOpVvSE9pH\nSCyzrIaseg1gtz5huDVO5GOK3Xg/VUr2n3sk98MQtHWWaEfcpstSrrefjTC4IYN5\n2QIDAQAB\n-----END PUBLIC KEY-----\n" + }, + { + "challenge": "6bfZOoD9L35qO1GIzVHcv9sX0UEzKCTru8yz1U7NK4o7y0gnXoU3Ak47sFFY4Yzb", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwpxT5iX72pkd/m8Fb3mg\nMkCQMoWb3FKAjHsutKpUEMA0ts1atZe7WFBRcCxV2mTDeWFpSwWjuYYSNNrEgk9e\nBRiLJ/36hCewnzw9PZMPcnWv+QLbyLsr4jAEVHk2pWln2HkVbAmK2OWEhvlUjxyT\nfB0b1UsBP3uy5f+SLb8iltvwWZGauT64JrLpbIwhk6SbXOCZSZtsXVZ5mVPEIxik\nZ4iBT3r+9Fc3fgKN/16bjdHw+qbWxovEYejG10Yp1yO4QjSzkxQsXTFvsWxaTKF2\ncZa5GF19b9ZkY3SRxHF6emA720F+N4oeGuV0Zu/ACYfMqRUSkh5GiOpv6VxvuXRD\n0wIDAQAB\n-----END PUBLIC KEY-----\n" + }, + { + "challenge": "NNH3Pt3F-OvaeYR_Dynp_nbHMuLaVYBnkG7uJtwz2-lShyLaHNjOyjBnL-eGjoRY", + "publicKeyPem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsrrPGkxbk08x5CpkUk5y\nfWBmfiE4qU4IWaSO9HCBv5uRWJvDqXkKjkcBptwmGFsnzT+owfSe+21nWLOLZqwW\nmPbV0bW3e7l3ZUw/4fUga+KJDR5OfkkXWSos1cEMhxsSMnGykhx2/ge9bqY0Edbr\nzckOT2un87ThdawveS3hOxTczE+JcgzoI+CUxlPV0c9yJ5iNFZXf1p7wj3Rq2I8X\nAl4XyMP/+0TLR5+UTrrxLC4ds4m9EjMPRv4aNJFqzBfb3WBM/DFVvNR82Mt2pfF8\nlv6RyZU/vls6vjDl42NK3hckOhEGqQpPmifKgPCaOwdLHg68CjQZ54GWGqyFGzNx\nHwIDAQAB\n-----END PUBLIC KEY-----\n" + } + ], + "requiredApproverCount": 3 + } +} + +""" + + +# Test case 1: Successful build and components add +def test_build_custom_gcloud_success(mock_subprocess_run): + # Setup: Mock successful gcloud execution + mock_subprocess_run.side_effect = [ + subprocess.CompletedProcess( + args=gcloud_commands.command_build_custom_gcloud, + returncode=0, + stdout="gcloud build successful!", + stderr="", + ), + subprocess.CompletedProcess( + args=gcloud_commands.command_add_components, + returncode=0, + stdout="gcloud components add successful.", + stderr="", + ), + ] + + # Action: Call the function + result = gcloud_commands.build_custom_gcloud() + + # Assert: Verify the return value and that subprocess.run was called correctly + assert result.returncode == 0 + assert result.stdout == "gcloud components add successful." + assert mock_subprocess_run.call_count == 2 + mock_subprocess_run.assert_has_calls( + [ + mock.call( + gcloud_commands.command_build_custom_gcloud, + check=True, + shell=True, + capture_output=True, + text=True, + ), + mock.call( + gcloud_commands.command_add_components, + check=True, + shell=True, + capture_output=True, + text=True, + ), + ] + ) + + +# Test case 2: gcloud build fails +def test_build_custom_gcloud_build_error(mock_subprocess_run): + # Setup: Mock gcloud build command with a non-zero return code + mock_subprocess_run.side_effect = subprocess.CalledProcessError( + returncode=1, + cmd=gcloud_commands.command_build_custom_gcloud, + output="", + stderr="Error: Build failed", + ) + + # Action & Assert: Call the function and verify that the + # CalledProcessError is re-raised + with pytest.raises(subprocess.CalledProcessError) as exc_info: + gcloud_commands.build_custom_gcloud() + + assert exc_info.value.returncode == 1 + assert exc_info.value.stderr == "Error: Build failed" + assert exc_info.value.cmd == gcloud_commands.command_build_custom_gcloud + assert mock_subprocess_run.call_count == 1 + + +# Test case 3: gcloud components add fails +def test_build_custom_gcloud_components_error(mock_subprocess_run): + # Setup: Mock gcloud build success and components add with error + mock_subprocess_run.side_effect = [ + subprocess.CompletedProcess( + args=gcloud_commands.command_build_custom_gcloud, + returncode=0, + stdout="gcloud build successful!", + stderr="", + ), + subprocess.CalledProcessError( + returncode=1, + cmd=gcloud_commands.command_add_components, + output="", + stderr="Error: Components add failed", + ), + ] + + # Action & Assert: Call the function and verify that the + # CalledProcessError is re-raised + with pytest.raises(subprocess.CalledProcessError) as exc_info: + gcloud_commands.build_custom_gcloud() + + assert exc_info.value.returncode == 1 + assert exc_info.value.stderr == "Error: Components add failed" + assert exc_info.value.cmd == gcloud_commands.command_add_components + assert mock_subprocess_run.call_count == 2 + + +@pytest.fixture +def mock_subprocess_run(monkeypatch): + mock_run = mock.create_autospec(subprocess.run) + monkeypatch.setattr(subprocess, "run", mock_run) + return mock_run + + +def test_fetch_challenges_success(mock_subprocess_run): + # Setup: Configure the mock to simulate a successful gcloud command + mock_process_result = subprocess.CompletedProcess( + args=[], + returncode=0, + stdout=sample_fetch_challenge_output, + stderr="", + ) + mock_subprocess_run.return_value = mock_process_result + + # Action: Call the function + resource = test_proposal_resource + result = gcloud_commands.fetch_challenges(resource) + + # Assertions: Verify the results + mock_subprocess_run.assert_called_once_with( + gcloud_commands.command_gcloud_describe_proposal + resource + " --format=json", + capture_output=True, + check=True, + text=True, + shell=True, + ) + assert result == mock_process_result + assert result.returncode == 0 + assert result.stdout == sample_fetch_challenge_output + assert not result.stderr + + +def test_fetch_challenges_error(mock_subprocess_run): + # Setup: Configure the mock to simulate a failed gcloud command + mock_subprocess_run.side_effect = subprocess.CalledProcessError( + returncode=1, cmd="", output="", stderr="Error: Invalid resource" + ) + + # Action & Assert: Call the function and check for the expected exception + resource = "invalid-resource" + with pytest.raises(subprocess.CalledProcessError) as exc_info: + gcloud_commands.fetch_challenges(resource) + + # Verify the exception details + assert exc_info.value.returncode == 1 + assert exc_info.value.stderr == "Error: Invalid resource" + + +def test_fetch_challenges_command_construction(mock_subprocess_run): + # Setup: + mock_process_result = subprocess.CompletedProcess( + args=[], + returncode=0, + stdout="{}", + stderr="", + ) + mock_subprocess_run.return_value = mock_process_result + resource = test_proposal_resource + + # Action: Call the function + gcloud_commands.fetch_challenges(resource) + + # Assertions: Verify the command + mock_subprocess_run.assert_called_once_with( + gcloud_commands.command_gcloud_describe_proposal + resource + " --format=json", + capture_output=True, + check=True, + text=True, + shell=True, + ) + + +def test_fetch_challenges_output_capture(mock_subprocess_run): + # Setup: + expected_stdout = "Expected Output" + expected_stderr = "Expected Error" + expected_returncode = 0 + mock_process_result = subprocess.CompletedProcess( + args=[], + returncode=expected_returncode, + stdout=expected_stdout, + stderr=expected_stderr, + ) + mock_subprocess_run.return_value = mock_process_result + resource = test_proposal_resource + # Action: Call the function + result = gcloud_commands.fetch_challenges(resource) + + # Assertions: Verify the captured output + assert result.stdout == expected_stdout + assert result.stderr == expected_stderr + assert result.returncode == expected_returncode + + +# Test case 1: Successful gcloud command +def test_send_signed_challenges_success(mock_subprocess_run): + # Setup: Mock successful gcloud execution + signed_files = [("signed_challenge.bin", "public_key_1.pem")] + proposal = "my-proposal" + mock_subprocess_run.return_value = subprocess.CompletedProcess( + args=[], # Not checked in this test, but good practice to include + returncode=0, + stdout="gcloud command successful!", + stderr="", + ) + + # Action: Call the function + result = gcloud_commands.send_signed_challenges(signed_files, proposal) + + # Assert: Verify the return value and that subprocess.run was called correctly + assert result.returncode == 0 + assert result.stdout == "gcloud command successful!" + expected_command = " ".join( + gcloud_commands.command_gcloud_approve_proposal + + [proposal] + + ["--challenge_replies=\"[('signed_challenge.bin'," " 'public_key_1.pem')]\""] + ) + mock_subprocess_run.assert_called_once_with( + expected_command, + capture_output=True, + check=True, + text=True, + shell=True, + ) + + +# Test case 2: gcloud command returns an error code +def test_send_signed_challenges_gcloud_error(mock_subprocess_run): + # Setup: Mock gcloud command with a non-zero return code and stderr + signed_files = [("signed_challenge.bin", "public_key_1.pem")] + proposal = "my-proposal" + mock_subprocess_run.return_value = subprocess.CompletedProcess( + args=[], + returncode=1, + stdout="", + stderr="Error: Invalid proposal resource", + ) + + # Action: Call the function + result = gcloud_commands.send_signed_challenges(signed_files, proposal) + + # Assert: Verify the return value + assert result.returncode == 1 + assert result.stderr == "Error: Invalid proposal resource" + + +# Test case 3: subprocess.run raises a CalledProcessError +def test_send_signed_challenges_called_process_error(mock_subprocess_run): + # Setup: Mock subprocess.run to raise a CalledProcessError + signed_files = [("signed_challenge.bin", "public_key_1.pem")] + proposal = "my-proposal" + mock_subprocess_run.side_effect = subprocess.CalledProcessError( + returncode=2, + cmd="test_command", + output="", + stderr="Called process error", + ) + + # Action & Assert: Call the function and verify that the + # CalledProcessError is re-raised + with pytest.raises(subprocess.CalledProcessError) as exc_info: + gcloud_commands.send_signed_challenges(signed_files, proposal) + + assert exc_info.value.returncode == 2 + assert exc_info.value.stderr == "Called process error" + assert exc_info.value.cmd == "test_command" + + +# Test case 4: Signed challenge file list is empty. +def test_send_signed_challenges_empty_list(mock_subprocess_run): + + # Action: Call the function + with pytest.raises(ValueError, match="signed_challenged_files is empty"): + gcloud_commands.send_signed_challenges([], test_proposal_resource) diff --git a/kms/singletenanthsm/requirements.txt b/kms/singletenanthsm/requirements.txt new file mode 100644 index 00000000000..12db531023d --- /dev/null +++ b/kms/singletenanthsm/requirements.txt @@ -0,0 +1,31 @@ +argcomplete==3.6.2 +attrs==25.3.0 +backports.tarfile==1.2.0 +cffi==1.17.1 +click==8.1.8 +colorlog==6.9.0 +cryptography==43.0.3 +dependency-groups==1.3.0 +distlib==0.3.9 +fido2==1.2.0 +filelock==3.18.0 +importlib_metadata==8.6.1 +iniconfig==2.0.0 +jaraco.classes==3.4.0 +jaraco.context==6.0.1 +jaraco.functools==4.1.0 +jeepney==0.8.0 +keyring==25.6.0 +more-itertools==10.6.0 +nox==2025.2.9 +packaging==24.2 +platformdirs==4.3.7 +pluggy==1.5.0 +pycparser==2.22 +pyscard==2.2.1 +pytest==8.3.4 +pytest-mock==3.14.0 +SecretStorage==3.3.3 +virtualenv==20.30.0 +yubikey-manager==5.5.1 +zipp==3.21.0 diff --git a/kms/singletenanthsm/setup.py b/kms/singletenanthsm/setup.py new file mode 100644 index 00000000000..3cd331b50ca --- /dev/null +++ b/kms/singletenanthsm/setup.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import argparse + +import gcloud_commands +import ykman_utils + + +def validate_operation(operation: str, management_key: str, pin: str): + if operation == "build_custom_gcloud": + try: + gcloud_commands.build_custom_gcloud() + except Exception as e: + raise Exception(f"Generating custom gcloud build failed {e}") + elif operation == "generate_rsa_keys": + try: + if not management_key or not pin: + raise ValueError( + "--management_key and --pin need to be specified for the generate_rsa_keys operation" + ) + ykman_utils.generate_private_key(management_key=management_key, pin=pin) + except Exception as e: + if not management_key or not pin: + raise ValueError( + "--management_key and --pin need to be specified for the generate_rsa_keys operation" + ) + raise Exception(f"Generating private keys failed {e}") + elif operation == "generate_gcloud_and_keys": + generate_private_keys_build_gcloud() + else: + raise Exception( + "Operation type not valid. Operation flag value must be build_custom_gcloud," + " generate_rsa_keys, or generate_gcloud_and_keys" + ) + + +def generate_private_keys_build_gcloud(management_key: str, pin: str): + """Generates an RSA key on slot 82 of every yubikey + connected to the local machine and builds the custom gcloud cli. + """ + try: + ykman_utils.generate_private_key(management_key=management_key, pin=pin) + except Exception as e: + raise Exception(f"Generating private keys failed {e}") + try: + gcloud_commands.build_custom_gcloud() + except Exception as e: + raise Exception(f"Generating custom gcloud build failed {e}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--operation", + type=str, + choices=[ + "build_custom_gcloud", + "generate_rsa_keys", + "generate_gcloud_and_keys", + ], + required=True, + ) + parser.add_argument( + "--management_key", + type=str, + required=False, + ) + parser.add_argument( + "--pin", + type=str, + required=False, + ) + args = parser.parse_args() + validate_operation(args.operation, args.management_key, args.pin) diff --git a/kms/singletenanthsm/ykman_fake.py b/kms/singletenanthsm/ykman_fake.py new file mode 100644 index 00000000000..c351aa39c1f --- /dev/null +++ b/kms/singletenanthsm/ykman_fake.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import rsa + + +def generate_rsa_keys(key_size=2048): + """Generates a private and public RSA key pair with the specified key size.""" + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=key_size, + ) + public_key = private_key.public_key() + return private_key, public_key + + +def sign_data(private_key, data, hash_algorithm=hashes.SHA256()): + """Signs the provided data using the private key with PKCS#1.5 padding.""" + if not isinstance(data, bytes): + raise TypeError("Data must be of type bytes") + signature = private_key.sign(data, padding.PKCS1v15(), hash_algorithm) + return signature + + +def verify_signature(public_key, data, signature, hash_algorithm=hashes.SHA256()): + """Verifies the signature of the data using the public key.""" + if not isinstance(data, bytes): + raise TypeError("Data must be of type bytes") + if not isinstance(signature, bytes): + raise TypeError("Signature must be of type bytes") + try: + public_key.verify(signature, data, padding.PKCS1v15(), hash_algorithm) + return True # Signature is valid + except InvalidSignature: + return False # Signature is invalid + + +if __name__ == "__main__": + private_key, public_key = generate_rsa_keys() + + # Data to sign (as bytes) + data_to_sign = b"This is the data to be signed." + signature = sign_data(private_key, data_to_sign) + print(f"Signature generated: {signature.hex()}") + is_valid = verify_signature(public_key, data_to_sign, signature) + if is_valid: + print("Signature is VALID.") + else: + print("Signature is INVALID.") diff --git a/kms/singletenanthsm/ykman_utils.py b/kms/singletenanthsm/ykman_utils.py new file mode 100644 index 00000000000..fd88cb8117d --- /dev/null +++ b/kms/singletenanthsm/ykman_utils.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +from dataclasses import dataclass +import os +import pathlib +import re + +import cryptography.exceptions +from cryptography.hazmat.primitives import _serialization +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.serialization import load_pem_public_key +from ykman import piv +from ykman.device import list_all_devices +from yubikit.piv import hashes, PIN_POLICY, TOUCH_POLICY +from yubikit.piv import SmartCardConnection + +DEFAULT_MANAGEMENT_KEY = "010203040506070801020304050607080102030405060708" +DEFAULT_PIN = "123456" + + +def generate_private_key( + key_type=piv.KEY_TYPE.RSA2048, + management_key=DEFAULT_MANAGEMENT_KEY, + pin=DEFAULT_PIN, +): + """Generates a private key on the yubikey.""" + + devices = list_all_devices() + if not devices: + raise ValueError("no yubikeys found") + print(f"{len(devices)} yubikeys detected") + for yubikey, device_info in devices: + with yubikey.open_connection(SmartCardConnection) as connection: + piv_session = piv.PivSession(connection) + piv_session.authenticate( + piv.MANAGEMENT_KEY_TYPE.TDES, + bytes.fromhex(management_key), + ) + piv_session.verify_pin(pin) + + public_key = piv_session.generate_key( + piv.SLOT.RETIRED1, + key_type=key_type, + pin_policy=PIN_POLICY.DEFAULT, + touch_policy=TOUCH_POLICY.ALWAYS, + ) + if not public_key: + raise RuntimeError("failed to generate public key") + directory_path = "generated_public_keys" + if not os.path.exists(directory_path): + os.mkdir(directory_path) + print(f"Directory '{directory_path}' created.") + + with open( + f"generated_public_keys/public_key_{device_info.serial}.pem", "wb" + ) as binary_file: + + # Write bytes to file + binary_file.write( + public_key.public_bytes( + encoding=_serialization.Encoding.PEM, + format=_serialization.PublicFormat.SubjectPublicKeyInfo, + ) + ) + print( + f"Private key pair generated on device {device_info.serial} on key" + f" slot: {piv.SLOT.RETIRED1}" + ) + + +@dataclass +class Challenge: + """Represents a challenge with its associated public key.""" + + challenge: bytes + public_key_pem: str + + def to_dict(self): + return { + "challenge": base64.b64encode(self.challenge).decode("utf-8"), + "public_key_pem": self.public_key_pem, + } + + @staticmethod + def from_dict(data): + if not isinstance(data, dict): + return None + return Challenge( + challenge=base64.b64decode(data["challenge"]), + public_key_pem=data["public_key_pem"], + ) + + +class ChallengeReply: + + def __init__(self, unsigned_challenge, signed_challenge, public_key_pem): + self.unsigned_challenge = unsigned_challenge + self.signed_challenge = signed_challenge + self.public_key_pem = public_key_pem + + +def populate_challenges_from_files() -> list[Challenge]: + """Populates challenges and their corresponding public keys from files. + + This function searches for files matching the patterns + "challenges/public_key*.pem" + and "challenges/challenge*.bin" in the current working directory. It then + pairs each challenge with its corresponding public key based on matching + numeric IDs in the filenames. + + Returns: + list[Challenge]: A list of Challenge objects, each containing a challenge + and its associated public key. + """ + public_key_files = list(pathlib.Path.cwd().glob("challenges/public_key*.pem")) + print(public_key_files) + challenge_files = list(pathlib.Path.cwd().glob("challenges/challenge*.bin")) + print(challenge_files) + + challenges = [] + + for public_key_file in public_key_files: + challenge_id = re.findall(r"\d+", str(public_key_file)) + for challenge_file in challenge_files: + if challenge_id == re.findall(r"\d+", str(challenge_file)): + print(public_key_file) + file = open(public_key_file, "r") + public_key_pem = file.read() + file.close() + file = open(challenge_file, "rb") + challenge = file.read() + file.close() + challenges.append(Challenge(challenge, public_key_pem)) + return challenges + + +def sign_challenges( + challenges: list[Challenge], management_key=DEFAULT_MANAGEMENT_KEY, pin=DEFAULT_PIN +) -> list[ChallengeReply]: + """Signs a proposal's challenges using a Yubikey.""" + if not challenges: + raise ValueError("Challenge list empty: No challenges to sign.") + signed_challenges = [] + devices = list_all_devices() + if not devices: + raise ValueError("no yubikeys found") + for yubikey, _ in devices: + with yubikey.open_connection(SmartCardConnection) as connection: + # Make PivSession and fetch public key from Signature slot. + piv_session = piv.PivSession(connection) + # authenticate + piv_session.authenticate( + piv.MANAGEMENT_KEY_TYPE.TDES, + bytes.fromhex(management_key), + ) + piv_session.verify_pin(pin) + + # Get the public key from slot 82. + slot_metadata = piv_session.get_slot_metadata(slot=piv.SLOT.RETIRED1) + print(slot_metadata.public_key.public_bytes) + + # Check to see if any of the challenge public keys matches with the + # public key from slot 82. + for challenge in challenges: + key_public_bytes = slot_metadata.public_key.public_bytes( + encoding=_serialization.Encoding.PEM, + format=_serialization.PublicFormat.SubjectPublicKeyInfo, + ) + print(key_public_bytes.decode()) + print(challenge.public_key_pem) + if key_public_bytes == challenge.public_key_pem.encode(): + + # sign the challenge + print("Press Yubikey to sign challenge") + signed_challenge = piv_session.sign( + slot=piv.SLOT.RETIRED1, + key_type=slot_metadata.key_type, + message=challenge.challenge, + hash_algorithm=hashes.SHA256(), + padding=padding.PKCS1v15(), + ) + + signed_challenges.append( + ChallengeReply( + challenge.challenge, + signed_challenge, + challenge.public_key_pem, + ) + ) + print("Challenge signed successfully") + if not signed_challenges: + raise RuntimeError( + "No matching public keys between Yubikey and challenges. Make sure" + " key is generated in correct slot" + ) + return signed_challenges + + +def urlsafe_base64_to_binary(urlsafe_string: str) -> bytes: + """Converts a URL-safe base64 encoded string to its binary equivalent. + + Args: + urlsafe_string: The URL-safe base64 encoded string. + + Returns: + The binary data as bytes, or None if an error occurs. + + Raises: + TypeError: If the input is not a string. + ValueError: If the input string is not valid URL-safe base64. + """ + try: + if not isinstance(urlsafe_string, str): + raise TypeError("Input must be a string") + # Check if the input string contains only URL-safe base64 characters + if not re.match(r"^[a-zA-Z0-9_-]*$", urlsafe_string): + raise ValueError("Input string contains invalid characters") + # Add padding if necessary. Base64 requires padding to be a multiple of 4 + missing_padding = len(urlsafe_string) % 4 + if missing_padding: + urlsafe_string += "=" * (4 - missing_padding) + return base64.urlsafe_b64decode(urlsafe_string) + except base64.binascii.Error as e: + raise ValueError(f"Invalid URL-safe base64 string: {e}") from e + + +def verify_challenge_signatures(challenge_replies: list[ChallengeReply]): + """Verifies the signatures of a list of challenge replies. + + Args: + challenge_replies: A list of ChallengeReply objects. + + Raises: + ValueError: If the list of challenge replies is empty. + cryptography.exceptions.InvalidSignature: If a signature is invalid. + """ + if not challenge_replies: + raise ValueError("No signed challenges to verify") + for challenge_reply in challenge_replies: + public_key = load_pem_public_key(challenge_reply.public_key_pem.encode()) + try: + public_key.verify( + challenge_reply.signed_challenge, + challenge_reply.unsigned_challenge, + padding.PKCS1v15(), + hashes.SHA256(), + ) + print("Signature verification success") + except cryptography.exceptions.InvalidSignature as e: + raise cryptography.exceptions.InvalidSignature( + f"Signature verification failed: {e}" + ) diff --git a/kms/singletenanthsm/ykman_utils_test.py b/kms/singletenanthsm/ykman_utils_test.py new file mode 100644 index 00000000000..bcff1c4320f --- /dev/null +++ b/kms/singletenanthsm/ykman_utils_test.py @@ -0,0 +1,108 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib + +import cryptography.exceptions + +import pytest + +import ykman_utils + + +class Challenge: + + def __init__(self, challenge, public_key_pem): + self.challenge = challenge + self.public_key_pem = public_key_pem + + +class ChallengeReply: + + def __init__(self, signed_challenge, public_key_pem): + self.signed_challenge = signed_challenge + self.public_key_pem = public_key_pem + + +challenge_test_data = b"test_data" + + +def generate_test_challenge_files(): + # Create challenges list from challenges directory + challenges = ykman_utils.populate_challenges_from_files() + for challenge in challenges: + print(challenge.challenge) + print(challenge.public_key_pem) + # Sign challenges + signed_challenges = ykman_utils.sign_challenges(challenges) + # Use a sample challenge from the HSM + ykman_utils.verify_challenge_signatures( + signed_challenges, + b"rddK-SCLvik55PPoxOxgjoZEnQ7kTttvtYg2-zYhpGsDjpsPEFw_2OKau1EFf3nN", + ) + + +# A yubikey connected to your local machine will be needed to run these tests. +# The generate_private_key() method will rewrite the key saved on slot 82(Retired1). +@pytest.fixture(autouse=True) +def key_setup(): + ykman_utils.generate_private_key() + + +def challenges(): + public_key_files = [ + key_file + for key_file in pathlib.Path.cwd().glob("generated_public_keys/public_key*.pem") + ] + challenges = [] + + for public_key_file in public_key_files: + file = open(public_key_file, "r") + public_key_pem = file.read() + challenges.append(Challenge(challenge_test_data, public_key_pem)) + return challenges + + +def test_sign_and_verify_challenges(): + signed_challenges = ykman_utils.sign_challenges(challenges()) + ykman_utils.verify_challenge_signatures(signed_challenges) + + +def test_verify_mismatching_data_fail(): + with pytest.raises(cryptography.exceptions.InvalidSignature) as exec_info: + signed_challenges = ykman_utils.sign_challenges(challenges()) + signed_challenges[0].signed_challenge = b"mismatched_data" + ykman_utils.verify_challenge_signatures(signed_challenges) + assert "Signature verification failed" in str(exec_info.value) + + +def test_sign_empty_challenge_list_fail(): + with pytest.raises(Exception) as exec_info: + ykman_utils.sign_challenges([]) + assert "Challenge list empty" in str(exec_info.value) + + +def test_sign_no_matching_public_keys_fail(): + modified_challenges = challenges() + for challenge in modified_challenges: + challenge.public_key_pem = "modified_public_key" + with pytest.raises(Exception) as exec_info: + ykman_utils.sign_challenges(modified_challenges) + assert "No matching public keys" in str(exec_info.value) + + +def test_verify_empty_challenge_replies_fail(): + with pytest.raises(Exception) as exec_info: + ykman_utils.verify_challenge_signatures([]) + assert "No signed challenges to verify" in str(exec_info)