Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions packages/pg-protocol/src/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ const enum code {
copyFail = 0x66,
}

const writer = new Writer()

const startup = (opts: Record<string, string>): Buffer => {
const writer = new Writer()
// protocol version
writer.addInt16(3).addInt16(0)
for (const key of Object.keys(opts)) {
Expand All @@ -43,22 +42,24 @@ const requestSsl = (): Buffer => {
}

const password = (password: string): Buffer => {
return writer.addCString(password).flush(code.startup)
return new Writer().addCString(password).flush(code.startup)
}

const sendSASLInitialResponseMessage = function (mechanism: string, initialResponse: string): Buffer {
const writer = new Writer()

// 0x70 = 'p'
writer.addCString(mechanism).addInt32(Buffer.byteLength(initialResponse)).addString(initialResponse)

return writer.flush(code.startup)
}

const sendSCRAMClientFinalMessage = function (additionalData: string): Buffer {
return writer.addString(additionalData).flush(code.startup)
return new Writer().addString(additionalData).flush(code.startup)
}

const query = (text: string): Buffer => {
return writer.addCString(text).flush(code.query)
return new Writer().addCString(text).flush(code.query)
}

type ParseOpts = {
Expand All @@ -70,6 +71,7 @@ type ParseOpts = {
const emptyArray: any[] = []

const parse = (query: ParseOpts): Buffer => {
const writer = new Writer()
// expect something like this:
// { name: 'queryName',
// text: 'select * from blah',
Expand Down Expand Up @@ -110,15 +112,15 @@ type BindOpts = {
valueMapper?: ValueMapper
}

const paramWriter = new Writer()

// make this a const enum so typescript will inline the value
const enum ParamType {
STRING = 0,
BINARY = 1,
}

const writeValues = function (values: any[], valueMapper?: ValueMapper): void {
const writeValues = function (writer: Writer, values: any[], valueMapper?: ValueMapper): Buffer {
const paramWriter = new Writer()

for (let i = 0; i < values.length; i++) {
const mappedVal = valueMapper ? valueMapper(values[i], i) : values[i]
if (mappedVal == null) {
Expand All @@ -139,9 +141,13 @@ const writeValues = function (values: any[], valueMapper?: ValueMapper): void {
paramWriter.addString(mappedVal)
}
}

return paramWriter.flush()
}

const bind = (config: BindOpts = {}): Buffer => {
const writer = new Writer()

// normalize config
const portal = config.portal || ''
const statement = config.statement || ''
Expand All @@ -152,10 +158,10 @@ const bind = (config: BindOpts = {}): Buffer => {
writer.addCString(portal).addCString(statement)
writer.addInt16(len)

writeValues(values, config.valueMapper)
const paramValues = writeValues(writer, values, config.valueMapper)

writer.addInt16(len)
writer.add(paramWriter.flush())
writer.add(paramValues)

// all results use the same format code
writer.addInt16(1)
Expand Down Expand Up @@ -219,8 +225,8 @@ const cstringMessage = (code: code, string: string): Buffer => {
return buffer
}

const emptyDescribePortal = writer.addCString('P').flush(code.describe)
const emptyDescribeStatement = writer.addCString('S').flush(code.describe)
const emptyDescribePortal = new Writer().addCString('P').flush(code.describe)
const emptyDescribeStatement = new Writer().addCString('S').flush(code.describe)

const describe = (msg: PortalOpts): Buffer => {
return msg.name
Expand All @@ -236,7 +242,7 @@ const close = (msg: PortalOpts): Buffer => {
}

const copyData = (chunk: Buffer): Buffer => {
return writer.add(chunk).flush(code.copyFromChunk)
return new Writer().add(chunk).flush(code.copyFromChunk)
}

const copyFail = (message: string): Buffer => {
Expand Down
4 changes: 4 additions & 0 deletions packages/pg/lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ class Query extends EventEmitter {
valueMapper: utils.prepareValue,
})
} catch (err) {
// we should close parse to avoid leaking connections
connection.close({ type: 'S', name: this.name })
connection.sync()

this.handleError(err, connection)
return
}
Expand Down