Skip to content

Commit 8db6584

Browse files
authored
Merge pull request #3 from itk-dev-rpa/release/1.0.0
Release/1.0.0
2 parents 6152783 + 00f274d commit 8db6584

File tree

10 files changed

+357
-8
lines changed

10 files changed

+357
-8
lines changed

.github/workflows/Linting.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ on: [pull_request]
44

55
jobs:
66
build:
7-
runs-on: windows-latest
7+
runs-on: ubuntu-latest
88
strategy:
99
matrix:
1010
python-version: ["3.11"]

README.md

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,49 @@
1-
# python-serviceplatformen
1+
# python_serviceplatformen
22

3-
Python modules to easier use Kombit's Serviceplatformen API
3+
This project is made to hopefully make it easier to use Kombit's Serviceplatformen API.
4+
5+
## Certificates and authentication
6+
7+
To use Kombit's Serviceplatformen you need a valid OCES3 certificate registered for your project.
8+
Ask your local Kombit systems architect for help with this.
9+
10+
You need specific access to each service on Serviceplatformen you want to use.
11+
One certificate can have access to multiple services.
12+
13+
This project needs your certificate to be in a unified PEM format. That is
14+
with the public and private key in a single file.
15+
16+
If your certificate is in P12-format you can convert it using openssl:
17+
18+
```bash
19+
openssl pkcs12 -in Certificate.p12 -out Certificate.pem -nodes
20+
```
21+
22+
Due to limitations in Python's implementation of SSL your certificate needs to exist as
23+
a file on the system while using this library.
24+
25+
When your certificate is registered and in PEM-format, you simply hand it to the KombitAccess
26+
class and it will handle the rest for you.
27+
28+
```python
29+
from python_serviceplatformen.authentication import KombitAccess
30+
ka = KombitAccess(cvr=cvr, cert_path=cert_path)
31+
```
32+
33+
## Tests
34+
35+
This project contains automated tests in the "tests" folder.
36+
To run these test you need to install the developer dependecies:
37+
38+
```bash
39+
pip install python_serviceplatformen[dev]
40+
```
41+
42+
### Environment variables
43+
44+
Create a .env file in the project folder and fill out these variables:
45+
46+
```yaml
47+
KOMBIT_TEST_CVR = "XXXXXXXX" # The cvr of the organization who owns the certificate
48+
KOMBIT_TEST_CERT_PATH = "C:\something\Certificate.pem" # The path to the certificate file
49+
```

changelog.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

88
## [Unreleased]
9+
10+
## [1.0.0] - 19-09-2024
11+
12+
### Added
13+
14+
- Module for authenticating towards Kombit's Serviceplatform API.
15+
- Function for checking if someone is registered for Digital Post or NemSMS.
16+
17+
[Unreleased]: https://github.com/itk-dev-rpa/python-serviceplatformen/compare/1.0.0...HEAD
18+
[1.0.0]: https://github.com/itk-dev-rpa/python-serviceplatformen/releases/tag/1.0.0

pyproject.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ requires = ["setuptools>=65.0"]
33
build-backend = "setuptools.build_meta"
44

55
[project]
6-
name = "python-serviceplatformen"
7-
version = "0.0.1"
6+
name = "python_serviceplatformen"
7+
version = "1.0.0"
88
authors = [
99
{ name="ITK Development", email="[email protected]" },
1010
]
@@ -16,12 +16,13 @@ classifiers = [
1616
"License :: OSI Approved :: MIT License",
1717
]
1818
dependencies = [
19-
"requests == 2.*"
19+
"requests == 2.*",
20+
"cryptography"
2021
]
2122

2223
[project.urls]
23-
"Homepage" = "https://github.com/itk-dev-rpa/python-serviceplatformen"
24-
"Bug Tracker" = "https://github.com/itk-dev-rpa/python-serviceplatformen/issues"
24+
"Homepage" = "https://github.com/itk-dev-rpa/python_serviceplatformen"
25+
"Bug Tracker" = "https://github.com/itk-dev-rpa/python_serviceplatformen/issues"
2526

2627
[project.optional-dependencies]
2728
dev = [
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
"""This module contains the KombitAccess class used to authenticate against the
2+
Kombit API.
3+
"""
4+
5+
from datetime import datetime, timedelta
6+
7+
import requests
8+
from requests.exceptions import HTTPError
9+
from cryptography import x509
10+
from cryptography.hazmat.backends import default_backend
11+
from cryptography.hazmat.primitives import serialization
12+
13+
14+
# pylint: disable-next=too-few-public-methods
15+
class KombitAccess:
16+
"""An object that handles access to the Kombit api."""
17+
cvr: str
18+
cert_path: str
19+
_access_tokens: dict[str, (str, datetime)]
20+
test: bool
21+
environment: str
22+
23+
def __init__(self, cvr: str, cert_path: str, test: bool = False) -> None:
24+
"""Create a new Kombit Access object.
25+
26+
Args:
27+
cvr: The cvr number of the organisation making the calls.
28+
cert_path: The path to the certificate in unified pem-format.
29+
test: Whether to use the test environment or not.
30+
"""
31+
self.cvr = cvr
32+
self.cert_path = cert_path
33+
self._access_tokens = {}
34+
self.test = test
35+
36+
if test:
37+
self.environment = "https://exttest.serviceplatformen.dk"
38+
else:
39+
self.environment = "https://prod.serviceplatformen.dk"
40+
41+
def get_access_token(self, entity_id: str) -> str:
42+
"""Get an access token to the api endpoint with the given entity id.
43+
If an access token already exists for the endpoint it is reused.
44+
45+
Args:
46+
entity_id: The entity id of the endpoint.
47+
test: Whether to use the test api or not.
48+
49+
Returns:
50+
An access token to be used in api calls.
51+
52+
Raises:
53+
HTTPError: If an access token couldn't be obtained for the given entity id.
54+
"""
55+
if entity_id in self._access_tokens and datetime.now() < self._access_tokens[entity_id][1]:
56+
return self._access_tokens[entity_id][0]
57+
58+
try:
59+
saml_token = _get_saml_token(self.cvr, self.cert_path, entity_id, test=self.test)
60+
access_token = _get_access_token(saml_token, self.cert_path, test=self.test)
61+
self._access_tokens[entity_id] = access_token
62+
return access_token[0]
63+
except HTTPError as exc:
64+
raise ValueError(f"Couldn't obtain access token for {entity_id}") from exc
65+
66+
67+
def _get_saml_token(cvr: str, cert_path: str, entity_id: str, test: bool) -> str:
68+
"""Get a SAML token for the endpoint with the given entity id.
69+
70+
Args:
71+
cvr: The cvr number of the organisation making the calls.
72+
cert_path: The path to the certificate in unified pem-format.
73+
entity_id: The entity id of the endpoint.
74+
test: Whether to use the test api or not.
75+
76+
Returns:
77+
A SAML token as a string.
78+
"""
79+
use_key = _extract_first_certificate(cert_path)
80+
81+
if test:
82+
url = "https://adgangsstyring.eksterntest-stoettesystemerne.dk/runtime/api/rest/wstrust/v1/issue"
83+
else:
84+
url = "https://adgangsstyring.stoettesystemerne.dk/runtime/api/rest/wstrust/v1/issue"
85+
86+
payload = {
87+
"TokenType": "http://docs.oasis-open.org/wss/oasis-wss-saml-token-profile-1.1#SAMLV2.0",
88+
"RequestType": "http://docs.oasis-open.org/ws-sx/ws-trust/200512/Issue",
89+
"KeyType": "http://docs.oasis-open.org/ws-sx/ws-trust/200512/PublicKey",
90+
"AnvenderKontekst": {
91+
"Cvr": cvr
92+
},
93+
"UseKey": use_key,
94+
"AppliesTo": {
95+
"EndpointReference": {
96+
"Address": entity_id
97+
}
98+
},
99+
"OnBehalfOf": None
100+
}
101+
102+
response = requests.post(url, json=payload, cert=cert_path, timeout=10)
103+
response.raise_for_status()
104+
105+
return response.json()['RequestedSecurityToken']['Assertion']
106+
107+
108+
def _get_access_token(saml_token: str, cert_path: str, test: bool) -> tuple[str, datetime]:
109+
"""Get an access token for the given SAML context.
110+
111+
Args:
112+
saml_token: The SAML token to get the access token for.
113+
cert_path: The path to the certificate in unified pem-format.
114+
test: Whether to use the test api or not.
115+
116+
Returns:
117+
The access token as a string and the expiry datetime of the token.
118+
"""
119+
if test:
120+
url = "https://exttest.serviceplatformen.dk/service/AccessTokenService_1/token"
121+
else:
122+
url = "https://prod.serviceplatformen.dk/service/AccessTokenService_1/token"
123+
124+
payload = {
125+
"saml-token": saml_token
126+
}
127+
128+
response = requests.post(url, data=payload, cert=cert_path, timeout=10)
129+
response.raise_for_status()
130+
response = response.json()
131+
132+
access_token = f"{response['token_type']} {response['access_token']}"
133+
expiry_time = datetime.now() + timedelta(seconds=response['expires_in']) - timedelta(minutes=5)
134+
135+
return access_token, expiry_time
136+
137+
138+
def _extract_first_certificate(pem_file: str) -> str:
139+
"""Extract the first certificate from a certificate file.
140+
141+
Args:
142+
pem_file: The path of the pem certificate.
143+
144+
Raises:
145+
ValueError: If the certificate couldn't be parsed.
146+
147+
Returns:
148+
The first certificate in the file as a single line string.
149+
"""
150+
with open(pem_file, "rb") as f:
151+
pem_data = f.read()
152+
153+
try:
154+
cert = x509.load_pem_x509_certificate(pem_data, default_backend())
155+
first_cert = cert.public_bytes(encoding=serialization.Encoding.PEM).decode("utf-8")
156+
first_cert_single_line = "".join(first_cert.splitlines()[1:-1])
157+
return first_cert_single_line
158+
except Exception as e:
159+
raise ValueError("Error parsing certificate") from e
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""This module contains function to help with datetimes in the Kombit API."""
2+
3+
from datetime import datetime
4+
5+
6+
def format_datetime(_datetime: datetime) -> str:
7+
"""Convert a datetime object to a string with the format:
8+
%Y-%m-%dT%H:%M:%SZ
9+
"""
10+
return _datetime.strftime('%Y-%m-%dT%H:%M:%SZ')
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""This moduel contains helper functions to use the SF1601 Kombit API.
2+
https://digitaliseringskataloget.dk/integration/sf1601
3+
"""
4+
5+
import urllib.parse
6+
import uuid
7+
from datetime import datetime
8+
from typing import Literal
9+
10+
import requests
11+
12+
from python_serviceplatformen.authentication import KombitAccess
13+
from python_serviceplatformen.date_helper import format_datetime
14+
15+
16+
def is_registered(cpr: str, service: Literal['digitalpost', 'nemsms'], kombit_access: KombitAccess) -> bool:
17+
"""Check if the person with the given cpr number is registered for
18+
either Digital Post or NemSMS.
19+
20+
Args:
21+
cpr: The cpr number of the person to look up.
22+
service: The service to look up for.
23+
kombit_access: The KombitAccess object used to authenticate.
24+
25+
Returns:
26+
True if the person is registered for the selected service.
27+
"""
28+
url = urllib.parse.urljoin(kombit_access.environment, "service/PostForespoerg_1/")
29+
url = urllib.parse.urljoin(url, service)
30+
31+
parameters = {
32+
"cprNumber": cpr
33+
}
34+
35+
headers = {
36+
"X-TransaktionsId": str(uuid.uuid4()),
37+
"X-TransaktionsTid": format_datetime(datetime.now()),
38+
"authorization": kombit_access.get_access_token("http://entityid.kombit.dk/service/postforespoerg/1")
39+
}
40+
41+
response = requests.get(url, params=parameters, headers=headers, timeout=10)
42+
response.raise_for_status()
43+
return response.json()['result']

tests/test_auth.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Tests of Kombit API authentication."""
2+
import unittest
3+
import os
4+
5+
from dotenv import load_dotenv
6+
7+
from python_serviceplatformen.authentication import KombitAccess
8+
9+
load_dotenv()
10+
11+
# We don't care about duplicate code in tests
12+
# pylint: disable=R0801
13+
14+
15+
class KombitAuthTest(unittest.TestCase):
16+
"""Test authentication against the Kombit API."""
17+
18+
def test_kombit_access(self):
19+
"""Test authentication."""
20+
cvr = os.environ["KOMBIT_TEST_CVR"]
21+
cert_path = os.environ["KOMBIT_TEST_CERT_PATH"]
22+
ka = KombitAccess(cvr=cvr, cert_path=cert_path, test=True)
23+
24+
# Test getting a token
25+
token = ka.get_access_token("http://entityid.kombit.dk/service/postforespoerg/1")
26+
self.assertIsInstance(token, str)
27+
self.assertGreater(len(token), 0)
28+
29+
# Test reuse of token
30+
token2 = ka.get_access_token("http://entityid.kombit.dk/service/postforespoerg/1")
31+
self.assertEqual(token, token2)
32+
33+
# Test getting a nonsense token
34+
with self.assertRaises(ValueError):
35+
ka.get_access_token("FooBar")
36+
37+
38+
if __name__ == '__main__':
39+
unittest.main()

tests/test_digital_post.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Tests of the Kombit Digital Post API."""
2+
import unittest
3+
import os
4+
5+
from dotenv import load_dotenv
6+
7+
from python_serviceplatformen.authentication import KombitAccess
8+
from python_serviceplatformen import digital_post
9+
10+
load_dotenv()
11+
12+
# We don't care about duplicate code in tests
13+
# pylint: disable=R0801
14+
15+
16+
class DigitalPostTest(unittest.TestCase):
17+
"""Test Digital Post functionality in the Kombit API."""
18+
19+
def test_is_registered(self):
20+
"""Test authentication."""
21+
cvr = os.environ["KOMBIT_TEST_CVR"]
22+
cert_path = os.environ["KOMBIT_TEST_CERT_PATH"]
23+
ka = KombitAccess(cvr=cvr, cert_path=cert_path, test=True)
24+
25+
# Fictional test cpr
26+
cpr = "2611740000"
27+
28+
result = digital_post.is_registered(cpr=cpr, service="digitalpost", kombit_access=ka)
29+
self.assertTrue(result)
30+
31+
result = digital_post.is_registered(cpr=cpr, service="nemsms", kombit_access=ka)
32+
self.assertFalse(result)
33+
34+
# Test with nonsense.
35+
# This should result in False
36+
result = digital_post.is_registered(cpr="FooBar", service="digitalpost", kombit_access=ka)
37+
self.assertFalse(result)
38+
39+
40+
if __name__ == '__main__':
41+
unittest.main()

0 commit comments

Comments
 (0)