Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 126 additions & 79 deletions pastila.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
#!/usr/bin/python3
import sys, requests, json, re, base64, os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

import base64
import json
import os
import sys
from random import randint
from typing import List
from urllib.parse import urlparse
from urllib.request import Request, urlopen

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

#
# Script to upload/download to/from pastila.nl from command line.
Expand All @@ -15,13 +24,13 @@
#


def sipHash128(m: bytes):
def sipHash128(m: bytes) -> str:
mask = (1 << 64) - 1
def rotl(v, offset, bits):

def rotl(v: List[int], offset: int, bits: int) -> None:
v[offset] = ((v[offset] << bits) & mask) | ((v[offset] & mask) >> (64 - bits))

def compress(v):
def compress(v: List[int]) -> None:
v[0] += v[1]
v[2] += v[3]
rotl(v, 1, 13)
Expand All @@ -37,21 +46,21 @@ def compress(v):
v[3] ^= v[0]
rotl(v, 2, 32)

v = [0x736f6d6570736575, 0x646f72616e646f6d, 0x6c7967656e657261, 0x7465646279746573]
v = [0x736F6D6570736575, 0x646F72616E646F6D, 0x6C7967656E657261, 0x7465646279746573]
offset = 0
while offset < len(m) - 7:
word = int.from_bytes(m[offset:offset + 8], 'little')
word = int.from_bytes(m[offset : offset + 8], "little")
v[3] ^= word
compress(v)
compress(v)
v[0] ^= word
offset += 8

buf = bytearray(8)
buf[:len(m) - offset] = m[offset:]
buf[7] = len(m) & 0xff
buf[: len(m) - offset] = m[offset:]
buf[7] = len(m) & 0xFF

word = int.from_bytes(buf, 'little')
word = int.from_bytes(buf, "little")
v[3] ^= word
compress(v)
compress(v)
Expand All @@ -63,79 +72,117 @@ def compress(v):
compress(v)

hash_val = ((v[0] ^ v[1]) & mask) + (((v[2] ^ v[3]) & mask) << 64)
s = '{:032x}'.format(hash_val)
return ''.join(s[i:i+2] for i in range(30,-2,-2))
s = f"{hash_val:032x}"
return "".join(s[i : i + 2] for i in range(30, -2, -2))

def error(s):

def is_valid_hex(s: str) -> bool:
return bool(s) and all(c in "0123456789abcdefABCDEF" for c in s)


def error(s: str) -> None:
sys.stderr.write(f"error: {s}\n")
sys.exit(1)

# This is too slow, and doesn't seem important.
#def getFingerprint(text):
# words = re.findall(r'\b\w{4,}\b', text.decode()) # doesn't exactly match the JS code, but it doesn't have to
# triplets = [''.join(words[i:i+3]) for i in range(len(words) - 2)]
# uniq = set(triplets)
# hashes = [sipHash128(s.encode())[:8] for s in uniq]
# hashes.append('ffffffff')
# return min(hashes)

def load(url):
r = re.match(r"^(?:(?:(?:(?:(?:https?:)?//)?pastila\.nl)?/)?\?)?([a-f0-9]+)/([a-f0-9]+)(?:#(.+))?$", url)
if r is None: error('bad url')
fingerprint, hash_hex, key = r.groups()

response = requests.post('https://uzg8q0g12h.eu-central-1.aws.clickhouse.cloud/?user=paste', data=f"SELECT content, is_encrypted FROM data_view(fingerprint = '{fingerprint}', hash = '{hash_hex}') FORMAT JSON")
if not response.ok: error(f"{response} {response.content}")

j = json.loads(response.content)
if j['rows'] != 1: error("paste not found")
#if 'statistics' in j: sys.stderr.write(f"{j['statistics']}")
content, is_encrypted = j['data'][0]['content'], j['data'][0]['is_encrypted']

if is_encrypted:
if key is None: error("paste is encrypted, but the url contains no key (part after '#')")
key = base64.b64decode(key)
content = base64.b64decode(content)
cipher = Cipher(algorithms.AES(key), modes.CTR(b'\x00' * 16), backend=default_backend())
decryptor = cipher.decryptor()
decrypted = decryptor.update(content) + decryptor.finalize()
content = decrypted

return content

def save(data, encrypt):

def load(url: str) -> bytes:
parsed = urlparse(url)
try:
fingerprint, hash_hex = parsed.query.split("/", maxsplit=1)
except ValueError:
error(f"invalid url: {url}")
hash_hex = hash_hex.split(".", maxsplit=1)[0] # for .diff, .md etc
key = parsed.fragment
if not (is_valid_hex(fingerprint) and is_valid_hex(hash_hex)):
error(f"invalid url: {url}")

query = (
"SELECT content, is_encrypted FROM "
f"data_view(fingerprint = '{fingerprint}', hash = '{hash_hex}') FORMAT JSON"
)

req = Request(
"https://uzg8q0g12h.eu-central-1.aws.clickhouse.cloud/?user=paste",
data=query.encode("utf-8"),
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urlopen(req) as response:
body = response.read().decode("utf-8")
except Exception as e:
error(f"failed to fetch paste: {e}")

j = json.loads(body)
if j["rows"] != 1:
error("paste not found")
# if 'statistics' in j: sys.stderr.write(f"{j['statistics']}")
content, is_encrypted = (
j["data"][0]["content"],
j["data"][0]["is_encrypted"],
) # type: str, int

if not is_encrypted:
return content.encode("utf-8")

if not key:
error("paste is encrypted, but no key provided in the URL")
decoded = base64.b64decode(content)
cipher = Cipher(
algorithms.AES(base64.b64decode(key)),
modes.CTR(b"\x00" * 16),
backend=default_backend(),
)
decryptor = cipher.decryptor()
decrypted = decryptor.update(decoded) + decryptor.finalize()
return decrypted


def save(data: bytes) -> str:
key = os.urandom(16)
url_suffix = ""
if encrypt:
cipher = Cipher(algorithms.AES(key), modes.CTR(b'\x00' * 16), backend=default_backend())
encryptor = cipher.encryptor()
encrypted = encryptor.update(data) + encryptor.finalize()
data = base64.b64encode(encrypted)
url_suffix = '#' + base64.b64encode(key).decode()
cipher = Cipher(
algorithms.AES(key), modes.CTR(b"\x00" * 16), backend=default_backend()
)
encryptor = cipher.encryptor()
encrypted = encryptor.update(data) + encryptor.finalize()
data = base64.b64encode(encrypted)
url_suffix = "#" + base64.b64encode(key).decode()

h = sipHash128(data)
fingerprint = 'cafebabe' # getFingerprint(data)

payload = json.dumps({
'fingerprint_hex': fingerprint,
'hash_hex': h,
'content': data.decode(),
'is_encrypted': encrypt,
})
response = requests.post('https://uzg8q0g12h.eu-central-1.aws.clickhouse.cloud/?user=paste', data=f'INSERT INTO data (fingerprint_hex, hash_hex, content, is_encrypted) FORMAT JSONEachRow {payload}')
if not response.ok: error(f"{response} {response.content}")
print(f"https://pastila.nl/?{fingerprint}/{h}{url_suffix}")


if len(sys.argv) == 1:
data = sys.stdin.buffer.read()
save(data, True)
elif len(sys.argv) == 2 and sys.argv[1] == 'plain':
data = sys.stdin.buffer.read()
save(data, False)
elif len(sys.argv) == 2:
data = load(sys.argv[1])
sys.stdout.buffer.write(data)
else:
print("usage: pastila.py [url]")
sys.exit(1)
fingerprint = f"{randint(0, 0xFFFFFFFF):08x}"

payload = json.dumps(
{
"fingerprint_hex": fingerprint,
"hash_hex": h,
"content": data.decode(),
"is_encrypted": 1,
}
)
req = Request(
"https://uzg8q0g12h.eu-central-1.aws.clickhouse.cloud/?user=paste",
data="INSERT INTO data (fingerprint_hex, hash_hex, content, is_encrypted) "
f"FORMAT JSONEachRow {payload}".encode("utf-8"),
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urlopen(req):
pass
except Exception as e:
error(f"failed to save paste: {e}")

return f"https://pastila.nl/?{fingerprint}/{h}{url_suffix}"


if __name__ == "__main__":
if len(sys.argv) == 1:
data = sys.stdin.buffer.read()
print(save(data))
elif len(sys.argv) == 2:
data = load(sys.argv[1])
sys.stdout.buffer.write(data)
else:
print("usage: pastila.py [url]")
sys.exit(1)