Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions deno/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ const sql = postgres('postgres://username:password@host:port/database', {
username : '', // Username of database user
password : '', // Password of database user
ssl : false, // true, prefer, require, tls.connect options
sslnegotiation : null, // direct
max : 10, // Max number of connections
max_lifetime : null, // Max lifetime in seconds (more info below)
idle_timeout : 0, // Idle connection timeout in seconds
Expand Down Expand Up @@ -1327,6 +1328,11 @@ This error is thrown for any queries that were pending when the timeout to [`sql

This error is thrown if the startup phase of the connection (tcp, protocol negotiation, and auth) took more than the default 30 seconds or what was specified using `connect_timeout` or `PGCONNECT_TIMEOUT`.

##### COPY_IN_PROGRESS
> You cannot execute queries during copy

This error is thrown if trying to run a query during a copy operation (writable / readable).

## TypeScript support

`postgres` has TypeScript support. You can pass a row list type for your queries in this way:
Expand Down
196 changes: 14 additions & 182 deletions deno/polyfills.js
Original file line number Diff line number Diff line change
@@ -1,189 +1,21 @@
/* global Deno */

import { Buffer } from 'https://deno.land/[email protected]/node/buffer.ts'
import { isIP } from 'https://deno.land/[email protected]/node/net.ts'
import { Buffer } from 'node:buffer'

const events = () => ({ data: [], error: [], drain: [], connect: [], secureConnect: [], close: [] })
const enc = new TextEncoder()

class Socket {
constructor() {
return createSocket()
}
}

function createSocket() {
let paused
, resume
, keepAlive

const socket = {
error,
success,
readyState: 'open',
setKeepAlive: x => {
keepAlive = x
socket.raw && socket.raw.setKeepAlive && socket.raw.setKeepAlive(x)
},
connect: (port, hostname) => {
socket.raw = null
socket.readyState = 'connecting'
typeof port === 'string'
? Deno.connect({ transport: 'unix', path: socket.path = port }).then(success, error)
: Deno.connect({ transport: 'tcp', port: socket.port = port, hostname: socket.hostname = hostname || 'localhost' }).then(success, error) // eslint-disable-line
return socket
},
pause: () => {
paused = new Promise(r => resume = r)
},
resume: () => {
resume && resume()
paused = null
},
isPaused: () => !!paused,
removeAllListeners: () => socket.events = events(),
events: events(),
raw: null,
on: (x, fn) => socket.events[x].push(fn),
once: (x, fn) => {
if (x === 'data')
socket.break = true
const e = socket.events[x]
e.push(once)
once.once = fn
function once(...args) {
fn(...args)
e.indexOf(once) > -1 && e.splice(e.indexOf(once), 1)
}
},
removeListener: (x, fn) => {
socket.events[x] = socket.events[x].filter(x => x !== fn && x.once !== fn)
},
write: (x, cb) => {
socket.raw.write(x).then(l => {
l < x.length
? socket.write(x.slice(l), cb)
: (cb && cb(null))
}).catch(err => {
cb && cb()
call(socket.events.error, err)
})
return false
},
destroy: () => close(),
end: (x) => {
x && socket.write(x)
close()
}
}

return socket

async function success(raw) {
if (socket.readyState !== 'connecting')
return raw.close()

const encrypted = socket.encrypted
socket.raw = raw
keepAlive != null && raw.setKeepAlive && raw.setKeepAlive(keepAlive)
socket.readyState = 'open'
socket.encrypted
? call(socket.events.secureConnect)
: call(socket.events.connect)

const b = new Uint8Array(1024)
let result

try {
while ((result = socket.readyState === 'open' && await raw.read(b))) {
call(socket.events.data, Buffer.from(b.subarray(0, result)))
if (!encrypted && socket.break && (socket.break = false, b[0] === 83))
return socket.break = false
paused && await paused
}
} catch (e) {
if (e instanceof Deno.errors.BadResource === false)
error(e)
}

if (!socket.encrypted || encrypted)
closed()
}

function close() {
try {
socket.raw && socket.raw.close()
} catch (e) {
if (e instanceof Deno.errors.BadResource === false)
call(socket.events.error, e)
}
}
export const HmacSha256 = async (key, x) => {
const keyBytes = typeof key === "string" ? enc.encode(key) : key
const dataBytes = typeof x === "string" ? enc.encode(x) : x

function closed() {
if (socket.readyState === 'closed')
return
const cryptoKey = await crypto.subtle.importKey(
"raw",
keyBytes,
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"],
)

socket.break = socket.encrypted = false
socket.readyState = 'closed'
call(socket.events.close)
}

function error(err) {
call(socket.events.error, err)
socket.raw
? close()
: closed()
}

function call(xs, x) {
xs.slice().forEach(fn => fn(x))
}
}

export const net = {
isIP,
createServer() {
const server = {
address() {
return { port: 9876 }
},
async listen() {
server.raw = Deno.listen({ port: 9876, transport: 'tcp' })
for await (const conn of server.raw)
setTimeout(() => conn.close(), 500)
},
close() {
server.raw.close()
}
}
return server
},
Socket
}

export const tls = {
connect({ socket, ...options }) {
socket.encrypted = true
socket.readyState = 'connecting'
Deno.startTls(socket.raw, { hostname: socket.hostname, ...options })
.then(socket.success, socket.error)
socket.raw = null
return socket
}
}

let ids = 1
const tasks = new Set()
export const setImmediate = fn => {
const id = ids++
tasks.add(id)
queueMicrotask(() => {
if (tasks.has(id)) {
fn()
tasks.delete(id)
}
})
return id
const mac = await crypto.subtle.sign("HMAC", cryptoKey, dataBytes)
return Buffer.from(mac)
}

export const clearImmediate = id => tasks.delete(id)

2 changes: 1 addition & 1 deletion deno/src/bytes.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from 'https://deno.land/[email protected]/node/buffer.ts'
import { Buffer } from 'node:buffer'
const size = 256
let buffer = Buffer.allocUnsafe(size)

Expand Down
69 changes: 39 additions & 30 deletions deno/src/connection.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { HmacSha256 } from 'https://deno.land/[email protected]/hash/sha256.ts'
import { Buffer } from 'https://deno.land/[email protected]/node/buffer.ts'
import { setImmediate, clearImmediate } from '../polyfills.js'
import { net } from '../polyfills.js'
import { tls } from '../polyfills.js'
import crypto from 'https://deno.land/[email protected]/node/crypto.ts'
import Stream from 'https://deno.land/[email protected]/node/stream.ts'
import { HmacSha256 } from '../polyfills.js'
import { Buffer } from 'node:buffer'
import { setImmediate, clearImmediate } from 'node:timers'
import net from 'node:net'
import tls from 'node:tls'
import crypto from 'node:crypto'
import Stream from 'node:stream'


import { stringify, handleValue, arrayParser, arraySerializer } from './types.js'
Expand Down Expand Up @@ -54,6 +54,7 @@ const errorFields = {

function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose = noop } = {}) {
const {
sslnegotiation,
ssl,
max,
user,
Expand Down Expand Up @@ -87,7 +88,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
, statements = {}
, statementId = Math.random().toString(36).slice(2)
, statementCount = 1
, closedDate = 0
, closedTime = 0
, remaining = 0
, hostIndex = 0
, retries = 0
Expand Down Expand Up @@ -157,7 +158,10 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
function execute(q) {
if (terminated)
return queryError(q, Errors.connection('CONNECTION_DESTROYED', options))


if (stream)
return queryError(q, Errors.generic('COPY_IN_PROGRESS', 'You cannot execute queries during copy'))

if (q.cancelled)
return

Expand Down Expand Up @@ -262,25 +266,29 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

async function secure() {
write(SSLRequest)
const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S

if (!canSSL && ssl === 'prefer')
return connected()

socket.removeAllListeners()
socket = tls.connect({
if (sslnegotiation !== 'direct') {
write(SSLRequest)
const canSSL = await new Promise(r => socket.once('data', x => r(x[0] === 83))) // S

if (!canSSL && ssl === 'prefer')
return connected()
}

const options = {
socket,
servername: net.isIP(socket.host) ? undefined : socket.host,
...(ssl === 'require' || ssl === 'allow' || ssl === 'prefer'
? { rejectUnauthorized: false }
: ssl === 'verify-full'
? {}
: typeof ssl === 'object'
? ssl
: {}
)
})
}

if (sslnegotiation === 'direct')
options.ALPNProtocols = ['postgresql']

if (ssl === 'require' || ssl === 'allow' || ssl === 'prefer')
options.rejectUnauthorized = false
else if (typeof ssl === 'object')
Object.assign(options, ssl)

socket.removeAllListeners()
socket = tls.connect(options)
socket.on('secureConnect', connected)
socket.on('error', error)
socket.on('close', closed)
Expand Down Expand Up @@ -353,7 +361,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
}

function reconnect() {
setTimeout(connect, closedDate ? closedDate + delay - performance.now() : 0)
setTimeout(connect, closedTime ? Math.max(0, closedTime + delay - performance.now()) : 0)
}

function connected() {
Expand All @@ -364,7 +372,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
statementCount = 1
lifeTimer.start()
socket.on('data', data)
keep_alive && socket.setKeepAlive && socket.setKeepAlive(true)
keep_alive && socket.setKeepAlive && socket.setKeepAlive(true, 1000 * keep_alive)
const s = StartupMessage()
write(s)
} catch (err) {
Expand Down Expand Up @@ -445,7 +453,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
return reconnect()

!hadError && (query || sent.length) && error(Errors.connection('CONNECTION_CLOSED', options, socket))
closedDate = performance.now()
closedTime = performance.now()
hadError && options.shared.retries++
delay = (typeof backoff === 'function' ? backoff(options.shared.retries) : backoff) * 1000
onclose(connection, Errors.connection('CONNECTION_CLOSED', options, socket))
Expand Down Expand Up @@ -852,6 +860,7 @@ function Connection(options, queues = {}, { onopen = noop, onend = noop, onclose
final(callback) {
socket.write(b().c().end())
final = callback
stream = null
}
})
query.resolve(stream)
Expand Down Expand Up @@ -1007,7 +1016,7 @@ function md5(x) {
}

function hmac(key, x) {
return Buffer.from(new HmacSha256(key).update(x).digest())
return HmacSha256(key, x)
}

function sha256(x) {
Expand Down
11 changes: 6 additions & 5 deletions deno/src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import process from 'https://deno.land/[email protected]/node/process.ts'
import os from 'https://deno.land/[email protected]/node/os.ts'
import fs from 'https://deno.land/[email protected]/node/fs.ts'
import process from 'node:process'
import os from 'node:os'
import fs from 'node:fs'

import {
mergeUserTypes,
Expand Down Expand Up @@ -447,8 +447,9 @@ function parseOptions(a, b) {

const ints = ['idle_timeout', 'connect_timeout', 'max_lifetime', 'max_pipeline', 'backoff', 'keep_alive']
const defaults = {
max : 10,
max : globalThis.Cloudflare ? 3 : 10,
ssl : false,
sslnegotiation : null,
idle_timeout : null,
connect_timeout : 30,
max_lifetime : max_lifetime,
Expand Down Expand Up @@ -482,8 +483,8 @@ function parseOptions(a, b) {
{}
),
connection : {
application_name: env.PGAPPNAME || 'postgres.js',
...o.connection,
application_name: o.connection?.application_name ?? env.PGAPPNAME ?? 'postgres.js',
...Object.entries(query).reduce((acc, [k, v]) => (k in defaults || (acc[k] = v), acc), {})
},
types : o.types || {},
Expand Down
Loading