Skip to content
Open
13 changes: 13 additions & 0 deletions .whitesource
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"scanSettings": {
"baseBranches": []
},
"checkRunSettings": {
"vulnerableCheckRunConclusionLevel": "failure",
"displayMode": "diff"
},
"issueSettings": {
"minSeverityLevel": "LOW",
"issueType": "DEPENDENCY"
}
}
160 changes: 97 additions & 63 deletions certbot_azure/azure_agw.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
from __future__ import print_function

import os
import sys
import logging
import time
import OpenSSL
import base64

try:
from secrets import token_urlsafe
except ImportError:
from os import urandom

def token_urlsafe(nbytes=None):
return urandom(nbytes)


import zope.component
import zope.interface

Expand All @@ -29,16 +31,22 @@ def token_urlsafe(nbytes=None):
from msrestazure.azure_exceptions import CloudError


MSDOCS = 'https://docs.microsoft.com/'
ACCT_URL = MSDOCS + 'python/azure/python-sdk-azure-authenticate?view=azure-python#mgmt-auth-file'
AZURE_CLI_URL = MSDOCS + 'cli/azure/install-azure-cli?view=azure-cli-latest'
AZURE_CLI_COMMAND = ("az ad sp create-for-rbac"
" --name Certbot --sdk-auth"
" --scope /subscriptions/<SUBSCRIPTION_ID>/resourceGroups/<RESOURCE_GROUP_ID>"
" > mycredentials.json")
MSDOCS = "https://docs.microsoft.com/"
ACCT_URL = (
MSDOCS
+ "python/azure/python-sdk-azure-authenticate?view=azure-python#mgmt-auth-file"
)
AZURE_CLI_URL = MSDOCS + "cli/azure/install-azure-cli?view=azure-cli-latest"
AZURE_CLI_COMMAND = (
"az ad sp create-for-rbac"
" --name Certbot --sdk-auth"
" --scope /subscriptions/<SUBSCRIPTION_ID>/resourceGroups/<RESOURCE_GROUP_ID>"
" > mycredentials.json"
)

logger = logging.getLogger(__name__)


@zope.interface.implementer(interfaces.IInstaller)
@zope.interface.provider(interfaces.IPluginFactory)
class Installer(common.Plugin):
Expand All @@ -47,52 +55,59 @@ class Installer(common.Plugin):

@classmethod
def add_parser_arguments(cls, add):
add('credentials',
add(
"credentials",
help=(
'Path to Azure service account JSON file. If you already have a Service ' +
'Principal with the required permissions, you can create your own file as per ' +
'the JSON file format at {0}. ' +
'Otherwise, you can create a new Service Principal using the Azure CLI ' +
'(available at {1}) by running "az login" then "{2}"' +
'This will create file "mycredentials.json" which you should secure, then ' +
'specify with this option or with the AZURE_AUTH_LOCATION environment variable.')
.format(ACCT_URL, AZURE_CLI_URL, AZURE_CLI_COMMAND),
default=None)
add('resource-group',
help=('Resource Group in which the DNS zone is located'),
default=None)
add('app-gateway-name',
help=('Name of the application gateway'),
default=None)
"Path to Azure service account JSON file. If you already have a Service "
+ "Principal with the required permissions, you can create your own file as per "
+ "the JSON file format at {0}. "
+ "Otherwise, you can create a new Service Principal using the Azure CLI "
+ '(available at {1}) by running "az login" then "{2}"'
+ 'This will create file "mycredentials.json" which you should secure, then '
+ "specify with this option or with the AZURE_AUTH_LOCATION environment variable."
).format(ACCT_URL, AZURE_CLI_URL, AZURE_CLI_COMMAND),
default=None,
)
add(
"resource-group",
help=("Resource Group in which the DNS zone is located"),
default=None,
)
add("app-gateway-name", help=("Name of the application gateway"), default=None)

def __init__(self, *args, **kwargs):
super(Installer, self).__init__(*args, **kwargs)
self._setup_credentials()

self.azure_client = _AzureClient(self.conf('resource-group'), self.conf('credentials'))

self.azure_client = _AzureClient(
self.conf("resource-group"), self.conf("credentials")
)

def _setup_credentials(self):
if self.conf('resource-group') is None:
raise errors.PluginError('Please specify a resource group using '
'--azure-agw-resource-group <RESOURCEGROUP>')
if self.conf("resource-group") is None:
raise errors.PluginError(
"Please specify a resource group using "
"--azure-agw-resource-group <RESOURCEGROUP>"
)

if self.conf('app-gateway-name') is None:
raise errors.PluginError('Please specify the app gateway name '
'--azure-agw-resource-group <RESOURCEGROUP>')
if self.conf("app-gateway-name") is None:
raise errors.PluginError(
"Please specify the app gateway name "
"--azure-agw-resource-group <RESOURCEGROUP>"
)

if self.conf(
'credentials') is None and 'AZURE_AUTH_LOCATION' not in os.environ:
if self.conf("credentials") is None and "AZURE_AUTH_LOCATION" not in os.environ:
raise errors.PluginError(
'Please specify credentials file using the '
'AZURE_AUTH_LOCATION environment variable or '
'using --azure-agw-credentials <file>')
"Please specify credentials file using the "
"AZURE_AUTH_LOCATION environment variable or "
"using --azure-agw-credentials <file>"
)

def prepare(self): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

def more_info(self): # pylint: disable=missing-docstring,no-self-use
return ("")
return ""

def get_all_names(self): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover
Expand All @@ -102,9 +117,13 @@ def deploy_cert(self, domain, cert_path, key_path, chain_path, fullchain_path):
Upload Certificate to the app gateway
"""

self.azure_client.update_agw(self.conf('app-gateway-name'),domain, key_path, fullchain_path)
self.azure_client.update_agw(
self.conf("app-gateway-name"), domain, key_path, fullchain_path
)

def enhance(self, domain, enhancement, options=None): # pylint: disable=missing-docstring,no-self-use
def enhance(
self, domain, enhancement, options=None
): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

def supported_enhancements(self): # pylint: disable=missing-docstring,no-self-use
Expand All @@ -113,10 +132,14 @@ def supported_enhancements(self): # pylint: disable=missing-docstring,no-self-u
def get_all_certs_keys(self): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

def save(self, title=None, temporary=False): # pylint: disable=missing-docstring,no-self-use
def save(
self, title=None, temporary=False
): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

def rollback_checkpoints(self, rollback=1): # pylint: disable=missing-docstring,no-self-use
def rollback_checkpoints(
self, rollback=1
): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

def recovery_routine(self): # pylint: disable=missing-docstring,no-self-use
Expand All @@ -131,31 +154,38 @@ def config_test(self): # pylint: disable=missing-docstring,no-self-use
def restart(self): # pylint: disable=missing-docstring,no-self-use
pass # pragma: no cover

def renew_deploy(self, lineage, *args, **kwargs): # pylint: disable=missing-docstring,no-self-use
def renew_deploy(
self, lineage, *args, **kwargs
): # pylint: disable=missing-docstring,no-self-use
"""
Renew certificates when calling `certbot renew`
"""

# Run deploy_cert with the lineage params
self.deploy_cert(lineage.names()[0], lineage.cert_path, lineage.key_path, lineage.chain_path, lineage.fullchain_path)
self.deploy_cert(
lineage.names()[0],
lineage.cert_path,
lineage.key_path,
lineage.chain_path,
lineage.fullchain_path,
)

return



class _AzureClient(object):
"""
Encapsulates all communication with the Azure Cloud DNS API.
"""

def __init__(self, resource_group, account_json=None):
self.resource_group = resource_group
self.resource_client = get_client_from_auth_file(ResourceManagementClient,
auth_path=account_json)
self.network_client = get_client_from_auth_file(NetworkManagementClient,
auth_path=account_json)


self.resource_client = get_client_from_auth_file(
ResourceManagementClient, auth_path=account_json
)
self.network_client = get_client_from_auth_file(
NetworkManagementClient, auth_path=account_json
)

def update_agw(self, agw_name, domain, key_path, fullchain_path):
from azure.mgmt.network.models import ApplicationGatewaySslCertificate
Expand All @@ -164,10 +194,14 @@ def update_agw(self, agw_name, domain, key_path, fullchain_path):
password = token_urlsafe(16)

# Get app gateway from client
agw = self.network_client.application_gateways.get(self.resource_group, agw_name)
agw = self.network_client.application_gateways.get(
self.resource_group, agw_name
)

if "Updating" in [ssl.provisioning_state for ssl in agw.ssl_certificates]:
raise errors.PluginError('There is a certificate in Updating state. Cowardly refusing to add a new one.')
raise errors.PluginError(
"There is a certificate in Updating state. Cowardly refusing to add a new one."
)

ssl = ApplicationGatewaySslCertificate()
ssl.name = domain + str(int(time.time()))
Expand All @@ -177,12 +211,14 @@ def update_agw(self, agw_name, domain, key_path, fullchain_path):
agw.ssl_certificates.append(ssl)

try:
self.network_client.application_gateways.create_or_update(self.resource_group,
agw_name,
agw)
self.network_client.application_gateways.create_or_update(
self.resource_group, agw_name, agw
)
except CloudError as e:
logger.warning('Encountered error updating app gateway: %s', e)
raise errors.PluginError('Error communicating with the Azure API: {0}'.format(e))
logger.warning("Encountered error updating app gateway: %s", e)
raise errors.PluginError(
"Error communicating with the Azure API: {0}".format(e)
)

def _generate_pfx_from_pems(self, key_path, fullchain_path, password):
"""Generate PFX file out of PEMs in order to meet App Gateway requirements"""
Expand All @@ -196,9 +232,7 @@ def _generate_pfx_from_pems(self, key_path, fullchain_path, password):
# Load Key into PKCS12 object
with open(key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
key_file.read(), password=None, backend=default_backend()
)

key = OpenSSL.crypto.PKey.from_cryptography_key(private_key)
Expand All @@ -207,8 +241,8 @@ def _generate_pfx_from_pems(self, key_path, fullchain_path, password):
# Load Cert into PKCS12 object
with open(fullchain_path, "rb") as cert_file:
crypto_cert = x509.load_pem_x509_certificate(
cert_file.read(),
default_backend())
cert_file.read(), default_backend()
)

cert = OpenSSL.crypto.X509.from_cryptography(crypto_cert)
p12.set_certificate(cert)
Expand Down
Loading