Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/confcom/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Release History
===============

1.4.4
++++++
* Improve the package building process

1.4.3
++++++
* Fix installing OPA on Windows and in strict networking environments
Expand Down
56 changes: 27 additions & 29 deletions src/confcom/azext_confcom/cose_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import hashlib
import os
import platform
import stat
Expand All @@ -18,13 +19,30 @@
POLICY_FIELD_CONTAINERS_ELEMENTS_REGO_FRAGMENTS_MINIMUM_SVN,
REGO_CONTAINER_START, REGO_FRAGMENT_START)
from azext_confcom.errors import eprint
from azext_confcom.lib.paths import get_binaries_dir
from knack.log import get_logger


logger = get_logger(__name__)
host_os = platform.system()
machine = platform.machine()


_binaries_dir = get_binaries_dir()
_cosesign1_binaries = {
"Linux": {
"path": _binaries_dir / "sign1util",
"url": "https://github.com/microsoft/cosesign1go/releases/download/v1.4.0/sign1util",
"sha256": "526b54aeb6293fc160e8fa1f81be6857300aba9641d45955f402f8b082a4d4a5",
},
"Windows": {
"path": _binaries_dir / "sign1util.exe",
"url": "https://github.com/microsoft/cosesign1go/releases/download/v1.4.0/sign1util.exe",
"sha256": "f33cccf2b1bb8c3a495c730984b47d0f0715678981dbfe712248a2452dd53303",
},
}


def call_cose_sign_tool(args: List[str], error_message: str, check=False):
item = subprocess.run(args, check=check, capture_output=True, timeout=120)

Expand All @@ -38,35 +56,15 @@ class CoseSignToolProxy: # pylint: disable=too-few-public-methods

@staticmethod
def download_binaries():
dir_path = os.path.dirname(os.path.realpath(__file__))

bin_folder = os.path.join(dir_path, "bin")
if not os.path.exists(bin_folder):
os.makedirs(bin_folder)

# get the most recent release artifacts from github
r = requests.get("https://api.github.com/repos/microsoft/cosesign1go/releases")
r.raise_for_status()
needed_assets = ["sign1util", "sign1util.exe"]

# these should be newest to oldest
for release in r.json():
# search for both windows and linux binaries
needed_asset_info = [asset for asset in release["assets"] if asset["name"] in needed_assets]
if len(needed_asset_info) == len(needed_assets):
for asset in needed_asset_info:
# say which version we're downloading
print(f"Downloading integrity-vhd version {release['tag_name']}")
# get the download url for the dmverity-vhd file
exe_url = asset["browser_download_url"]
# download the file
r = requests.get(exe_url)
r.raise_for_status()
# save the file to the bin folder
with open(os.path.join(bin_folder, asset["name"]), "wb") as f:
f.write(r.content)
# stop iterating through releases
break

for binary_info in _cosesign1_binaries.values():
cosesign1_fetch_resp = requests.get(binary_info["url"], verify=True)
cosesign1_fetch_resp.raise_for_status()

assert hashlib.sha256(cosesign1_fetch_resp.content).hexdigest() == binary_info["sha256"]

with open(binary_info["path"], "wb") as f:
f.write(cosesign1_fetch_resp.content)

Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downloaded binary files are missing executable permissions on Linux. The __init__ method (lines 81-87) attempts to add executable permissions later, but it's better to set them immediately after download. Add this after writing the file:

for binary_info in _cosesign1_binaries.values():
    cosesign1_fetch_resp = requests.get(binary_info["url"], verify=True)
    cosesign1_fetch_resp.raise_for_status()

    with open(binary_info["path"], "wb") as f:
        f.write(cosesign1_fetch_resp.content)
    
    # Set executable permissions on Unix-like systems
    if platform.system() != "Windows":
        os.chmod(binary_info["path"], 0o755)
Suggested change
# Set executable permissions on Unix-like systems
if platform.system() != "Windows":
os.chmod(binary_info["path"], 0o755)

Copilot uses AI. Check for mistakes.
def __init__(self):
script_directory = os.path.dirname(os.path.realpath(__file__))
Expand Down
94 changes: 39 additions & 55 deletions src/confcom/azext_confcom/kata_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import hashlib
import os
import platform
import stat
Expand All @@ -12,72 +13,55 @@
import requests
from azext_confcom.config import DATA_FOLDER
from azext_confcom.errors import eprint
from azext_confcom.lib.paths import get_binaries_dir, get_data_dir

host_os = platform.system()
machine = platform.machine()


_binaries_dir = get_binaries_dir()
_kata_binaries = {
"Linux": {
"path": _binaries_dir / "genpolicy-linux",
"url": "https://github.com/microsoft/kata-containers/releases/download/3.2.0.azl3.genpolicy3/genpolicy",
"sha256": "4cd497ca5e995ddacb53af4da47449c16291aea62e9f8b8ee0fe36ca8d41fe66",
},
"Windows": {
"path": _binaries_dir / "genpolicy-windows.exe",
"url": "https://github.com/microsoft/kata-containers/releases/download/3.2.0.azl1.genpolicy0/genpolicy.exe",
"sha256": "caa9d8ee21b5819cc42b5c0967b14e166c715f6d4c87b574edabeaaeebf3573c",
},
}
_data_dir = get_data_dir()
_kata_data = [
{
"path": _data_dir / "genpolicy-settings.json",
"url": "https://github.com/microsoft/kata-containers/releases/download/3.2.0.azl3.genpolicy3/genpolicy-settings.json", # pylint: disable=line-too-long
"sha256": "c38be1474b133d49800a43bd30c40e7585b5f302179a307f9c6d89f195daee94",
},
{
"path": _data_dir / "rules.rego",
"url": "https://github.com/microsoft/kata-containers/releases/download/3.2.0.azl3.genpolicy3/rules.rego",
"sha256": "2ca6c0e9617f97a922724112bd738fd73881d35b9ae5d31d573f0871d1ecf897",
},
]


class KataPolicyGenProxy: # pylint: disable=too-few-public-methods
# static variable to cache layer hashes between container groups
layer_cache = {}

@staticmethod
def download_binaries():
dir_path = os.path.dirname(os.path.realpath(__file__))

bin_folder = os.path.join(dir_path, "bin")
if not os.path.exists(bin_folder):
os.makedirs(bin_folder)

data_folder = os.path.join(dir_path, "data")
if not os.path.exists(data_folder):
os.makedirs(data_folder)

# get the most recent release artifacts from github
r = requests.get("https://api.github.com/repos/microsoft/kata-containers/releases")
r.raise_for_status()
bin_flag = False
needed_assets = ["genpolicy", "genpolicy.exe"]
# search for genpolicy in the assets from kata-container releases
for release in r.json():
is_target = (
"genpolicy" in release.get("tag_name") and
not release.get("draft") and
not release.get("prerelease")
)
if is_target:
# these should be newest to oldest
for asset in release["assets"]:
# download the file if it contains genpolicy
if asset["name"] in needed_assets:
# say which version we're downloading
print(f"Downloading genpolicy version {release['tag_name']}")
save_name = ""
if ".exe" in asset["name"]:
save_name = "genpolicy-windows.exe"
else:
save_name = "genpolicy-linux"
bin_flag = True
# get the download url for the genpolicy file
exe_url = asset["browser_download_url"]
# download the file
r = requests.get(exe_url)
r.raise_for_status()
# save the file to the bin folder
with open(os.path.join(bin_folder, save_name), "wb") as f:
f.write(r.content)

# download the rules.rego and genpolicy-settings.json files
if asset["name"] == "rules.rego" or asset["name"] == "genpolicy-settings.json":
# download the rules.rego file
exe_url = asset["browser_download_url"]
# download the file
r = requests.get(exe_url)
# save the file to the data folder
with open(os.path.join(data_folder, asset["name"]), "wb") as f:
f.write(r.content)
if bin_flag:
break

for binary_info in list(_kata_binaries.values()) + _kata_data:
kata_fetch_resp = requests.get(binary_info["url"], verify=True)
kata_fetch_resp.raise_for_status()

assert hashlib.sha256(kata_fetch_resp.content).hexdigest() == binary_info["sha256"]

with open(binary_info["path"], "wb") as f:
f.write(kata_fetch_resp.content)

def __init__(self):
script_directory = os.path.dirname(os.path.realpath(__file__))
Expand Down
53 changes: 27 additions & 26 deletions src/confcom/azext_confcom/lib/opa.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,50 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import platform
import requests
import hashlib
import json
import os
from pathlib import Path
import platform
import subprocess
from typing import Iterable

import requests

from azext_confcom.lib.binaries import get_binaries_dir

_opa_path = os.path.abspath(os.path.join(get_binaries_dir(), "opa"))
_opa_url = {
"Linux": "https://github.com/open-policy-agent/opa/releases/download/v1.10.1/opa_linux_amd64",
"Windows": "https://github.com/open-policy-agent/opa/releases/download/v1.10.1/opa_windows_amd64.exe",
}
_expected_sha256 = {
"Linux": "fe8e191d44fec33db2a3d0ca788b9f83f866d980c5371063620c3c6822792877",
"Windows": "4c932053350eabca47681208924046fbf3ad9de922d6853fb12cddf59aef15ce",
from typing import Iterable
from pathlib import Path
from azext_confcom.lib.paths import get_binaries_dir


_binaries_dir = get_binaries_dir()
_opa_binaries = {
"Linux": {
"path": _binaries_dir / "opa",
"url": "https://github.com/open-policy-agent/opa/releases/download/v1.10.1/opa_linux_amd64",
"sha256": "fe8e191d44fec33db2a3d0ca788b9f83f866d980c5371063620c3c6822792877",
},
"Windows": {
"path": _binaries_dir / "opa.exe",
"url": "https://github.com/open-policy-agent/opa/releases/download/v1.10.1/opa_windows_amd64.exe",
"sha256": "4c932053350eabca47681208924046fbf3ad9de922d6853fb12cddf59aef15ce",
},
}


def opa_get():

if not all(platform.system() in mapping for mapping in [_opa_url, _expected_sha256]):
raise RuntimeError(f"OPA is not supported on platform: {platform.system()}")

opa_fetch_resp = requests.get(_opa_url[platform.system()], verify=True)
opa_fetch_resp.raise_for_status()
for binary_info in _opa_binaries.values():
opa_fetch_resp = requests.get(binary_info["url"], verify=True)
opa_fetch_resp.raise_for_status()

assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == _expected_sha256[platform.system()]
assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == binary_info["sha256"]
Copy link

Copilot AI Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using assert for checksum validation is not recommended in production code, as assertions can be disabled with Python's -O optimization flag. Use an explicit check with a meaningful error message instead:

actual_hash = hashlib.sha256(opa_fetch_resp.content).hexdigest()
if actual_hash != binary_info["sha256"]:
    raise RuntimeError(f"Checksum verification failed for OPA binary. Expected: {binary_info['sha256']}, Got: {actual_hash}")
Suggested change
assert hashlib.sha256(opa_fetch_resp.content).hexdigest() == binary_info["sha256"]
actual_hash = hashlib.sha256(opa_fetch_resp.content).hexdigest()
if actual_hash != binary_info["sha256"]:
raise RuntimeError(f"Checksum verification failed for OPA binary. Expected: {binary_info['sha256']}, Got: {actual_hash}")

Copilot uses AI. Check for mistakes.

with open(_opa_path, "wb") as f:
f.write(opa_fetch_resp.content)
with open(binary_info["path"], "wb") as f:
f.write(opa_fetch_resp.content)

os.chmod(_opa_path, 0o755)
return _opa_path
os.chmod(binary_info["path"], 0o755)


def opa_run(args: Iterable[str]) -> subprocess.CompletedProcess:
return subprocess.run(
[_opa_path, *args],
[_opa_binaries[platform.system()]["path"], *args],
check=True,
stdout=subprocess.PIPE,
text=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
from pathlib import Path


def get_binaries_dir():
binaries_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "bin")
if not os.path.exists(binaries_dir):
os.makedirs(binaries_dir)
binaries_dir = Path(__file__).parent.parent / "bin"
binaries_dir.mkdir(parents=True, exist_ok=True)
return binaries_dir


def get_data_dir():
data_dir = Path(__file__).parent.parent / "data"
data_dir.mkdir(parents=True, exist_ok=True)
return data_dir
55 changes: 27 additions & 28 deletions src/confcom/azext_confcom/rootfs_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------


import hashlib
import os
import platform
import stat
Expand All @@ -13,47 +14,45 @@

import requests
from azext_confcom.errors import eprint
from azext_confcom.lib.paths import get_binaries_dir
from knack.log import get_logger


host_os = platform.system()
machine = platform.machine()
logger = get_logger(__name__)


_binaries_dir = get_binaries_dir()
_dmverity_vhd_binaries = {
"Linux": {
"path": _binaries_dir / "dmverity-vhd",
"url": "https://github.com/microsoft/integrity-vhd/releases/download/v1.6/dmverity-vhd",
"sha256": "b8cf3fa3594e48070a31aa538d5b4b40d5b33b8ac18bc25a1816245159648fb0",
},
"Windows": {
"path": _binaries_dir / "dmverity-vhd.exe",
"url": "https://github.com/microsoft/integrity-vhd/releases/download/v1.6/dmverity-vhd.exe",
"sha256": "ca0f95d798323f3ef26feb036112be9019f5ceaa6233ee2a65218d5a143ae474",
},
}


class SecurityPolicyProxy: # pylint: disable=too-few-public-methods
# static variable to cache layer hashes between container groups
layer_cache = {}

@staticmethod
def download_binaries():
dir_path = os.path.dirname(os.path.realpath(__file__))

bin_folder = os.path.join(dir_path, "bin")
if not os.path.exists(bin_folder):
os.makedirs(bin_folder)

# get the most recent release artifacts from github
r = requests.get("https://api.github.com/repos/microsoft/integrity-vhd/releases")
r.raise_for_status()
needed_assets = ["dmverity-vhd", "dmverity-vhd.exe"]
# these should be newest to oldest
for release in r.json():
# search for both windows and linux binaries
needed_asset_info = [asset for asset in release["assets"] if asset["name"] in needed_assets]
if len(needed_asset_info) == len(needed_assets):
for asset in needed_asset_info:
# say which version we're downloading
print(f"Downloading integrity-vhd version {release['tag_name']}")
# get the download url for the dmverity-vhd file
exe_url = asset["browser_download_url"]
# download the file
r = requests.get(exe_url)
r.raise_for_status()
# save the file to the bin folder
with open(os.path.join(bin_folder, asset["name"]), "wb") as f:
f.write(r.content)
# stop iterating through releases
break

for binary_info in _dmverity_vhd_binaries.values():
dmverity_vhd_fetch_resp = requests.get(binary_info["url"], verify=True)
dmverity_vhd_fetch_resp.raise_for_status()

assert hashlib.sha256(dmverity_vhd_fetch_resp.content).hexdigest() == binary_info["sha256"]

with open(binary_info["path"], "wb") as f:
f.write(dmverity_vhd_fetch_resp.content)

def __init__(self):
script_directory = os.path.dirname(os.path.realpath(__file__))
Expand Down
Loading
Loading