Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cassandra/numpy_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ cdef _parse_rows(BytesIOReader reader, ParseDesc desc,
cdef Py_ssize_t i

for i in range(rowcount):
unpack_row(reader, desc, arrs)
unpack_plain_row(reader, desc, arrs)


### Helper functions to create NumPy arrays and array descriptors
Expand Down Expand Up @@ -144,7 +144,7 @@ def make_array(coltype, array_size):

@cython.boundscheck(False)
@cython.wraparound(False)
cdef inline int unpack_row(
cdef inline int unpack_plain_row(
BytesIOReader reader, ParseDesc desc, ArrDesc *arrays) except -1:
cdef Buffer buf
cdef Py_ssize_t i, rowsize = desc.rowsize
Expand Down
47 changes: 40 additions & 7 deletions cassandra/obj_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ cdef class ListParser(ColumnParser):
cdef Py_ssize_t i, rowcount
rowcount = read_int(reader)
cdef RowParser rowparser = TupleRowParser()
return [rowparser.unpack_row(reader, desc) for i in range(rowcount)]
if desc.column_encryption_policy:
return [rowparser.unpack_col_encrypted_row(reader, desc) for i in range(rowcount)]
else:
return [rowparser.unpack_plain_row(reader, desc) for i in range(rowcount)]


cdef class LazyParser(ColumnParser):
Expand All @@ -47,17 +50,22 @@ def parse_rows_lazy(BytesIOReader reader, ParseDesc desc):
cdef Py_ssize_t i, rowcount
rowcount = read_int(reader)
cdef RowParser rowparser = TupleRowParser()
return (rowparser.unpack_row(reader, desc) for i in range(rowcount))
if desc.column_encryption_policy:
return (rowparser.unpack_col_encrypted_row(reader, desc) for i in range(rowcount))
else:
return (rowparser.unpack_plain_row(reader, desc) for i in range(rowcount))


cdef class TupleRowParser(RowParser):
"""
Parse a single returned row into a tuple of objects:

(obj1, ..., objN)
If CE (Column encryption) policy is enabled - use unpack_col_encrypted_row(),
otherwsise use unpack_plain_row()
"""

cpdef unpack_row(self, BytesIOReader reader, ParseDesc desc):
cpdef unpack_col_encrypted_row(self, BytesIOReader reader, ParseDesc desc):
assert desc.rowsize >= 0

cdef Buffer buf
Expand All @@ -73,9 +81,9 @@ cdef class TupleRowParser(RowParser):

# Deserialize bytes to python object
deserializer = desc.deserializers[i]
coldesc = desc.coldescs[i]
uses_ce = ce_policy and ce_policy.contains_column(coldesc)
try:
coldesc = desc.coldescs[i]
uses_ce = ce_policy.contains_column(coldesc)
if uses_ce:
col_type = ce_policy.column_type(coldesc)
decrypted_bytes = ce_policy.decrypt(coldesc, to_bytes(&buf))
Expand All @@ -84,11 +92,36 @@ cdef class TupleRowParser(RowParser):
val = from_binary(deserializer, &newbuf, desc.protocol_version)
else:
val = from_binary(deserializer, &buf, desc.protocol_version)
# Insert new object into tuple
tuple_set(res, i, val)
except Exception as e:
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (desc.colnames[i],
desc.coltypes[i].cql_parameterized_type(),
str(e)))

return res

cpdef unpack_plain_row(self, BytesIOReader reader, ParseDesc desc):
assert desc.rowsize >= 0

cdef Buffer buf
cdef Py_ssize_t i, rowsize = desc.rowsize
cdef Deserializer deserializer
cdef tuple res = tuple_new(desc.rowsize)

for i in range(rowsize):
# Read the next few bytes
get_buf(reader, &buf)

# Deserialize bytes to python object
deserializer = desc.deserializers[i]
try:
val = from_binary(deserializer, &buf, desc.protocol_version)
# Insert new object into tuple
tuple_set(res, i, val)
except Exception as e:
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (desc.colnames[i],
desc.coltypes[i].cql_parameterized_type(),
str(e)))
# Insert new object into tuple
tuple_set(res, i, val)

return res
2 changes: 1 addition & 1 deletion cassandra/parsing.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ cdef class ColumnParser:
cpdef parse_rows(self, BytesIOReader reader, ParseDesc desc)

cdef class RowParser:
cpdef unpack_row(self, BytesIOReader reader, ParseDesc desc)
cpdef unpack_plain_row(self, BytesIOReader reader, ParseDesc desc)

2 changes: 1 addition & 1 deletion cassandra/parsing.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ cdef class ColumnParser:
cdef class RowParser:
"""Parser for a single row"""

cpdef unpack_row(self, BytesIOReader reader, ParseDesc desc):
cpdef unpack_plain_row(self, BytesIOReader reader, ParseDesc desc):
"""
Unpack a single row of data in a ResultMessage.
"""
Expand Down
31 changes: 22 additions & 9 deletions cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,24 +719,37 @@ def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata,
rows = [self.recv_row(f, len(column_metadata)) for _ in range(rowcount)]
self.column_names = [c[2] for c in column_metadata]
self.column_types = [c[3] for c in column_metadata]
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]

def decode_val(val, col_md, col_desc):
uses_ce = column_encryption_policy and column_encryption_policy.contains_column(col_desc)
col_type = column_encryption_policy.column_type(col_desc) if uses_ce else col_md[3]
raw_bytes = column_encryption_policy.decrypt(col_desc, val) if uses_ce else val
return col_type.from_binary(raw_bytes, protocol_version)
if column_encryption_policy:
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]

def decode_row(row):
return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs))
def decode_val(val, col_md, col_desc):
uses_ce = column_encryption_policy.contains_column(col_desc)
if uses_ce:
col_type = column_encryption_policy.column_type(col_desc)
raw_bytes = column_encryption_policy.decrypt(col_desc, val)
return col_type.from_binary(raw_bytes, protocol_version)
else:
return col_md[3].from_binary(val, protocol_version)

def decode_row(row):
return tuple(decode_val(val, col_md, col_desc) for val, col_md, col_desc in zip(row, column_metadata, col_descs))
else:
def decode_row(row):
return tuple(col_md[3].from_binary(val, protocol_version) for val, col_md in zip(row, column_metadata))

try:
self.parsed_rows = [decode_row(row) for row in rows]
except Exception:
if not column_encryption_policy:
col_descs = [ColDesc(md[0], md[1], md[2]) for md in column_metadata]
for row in rows:
for val, col_md, col_desc in zip(row, column_metadata, col_descs):
try:
decode_val(val, col_md, col_desc)
if column_encryption_policy:
decode_val(val, col_md, col_desc)
else:
col_md[3].from_binary(val, protocol_version)
except Exception as e:
raise DriverException('Failed decoding result column "%s" of type %s: %s' % (col_md[2],
col_md[3].cql_parameterized_type(),
Expand Down
65 changes: 44 additions & 21 deletions cassandra/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,28 +636,51 @@ def bind(self, values):

self.raw_values = values
self.values = []
for value, col_spec in zip(values, col_meta):
if value is None:
self.values.append(None)
elif value is UNSET_VALUE:
if proto_version >= 4:
self._append_unset_value()
if ce_policy:
for value, col_spec in zip(values, col_meta):
if value is None:
self.values.append(None)
elif value is UNSET_VALUE:
if proto_version >= 4:
self._append_unset_value()
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
else:
try:
col_desc = ColDesc(col_spec.keyspace_name, col_spec.table_name, col_spec.name)
uses_ce = ce_policy and ce_policy.contains_column(col_desc)
col_type = ce_policy.column_type(col_desc) if uses_ce else col_spec.type
col_bytes = col_type.serialize(value, proto_version)
if uses_ce:
col_bytes = ce_policy.encrypt(col_desc, col_bytes)
self.values.append(col_bytes)
except (TypeError, struct.error) as exc:
actual_type = type(value)
message = ('Received an argument of invalid type for column "%s". '
'Expected: %s, Got: %s; (%s)' % (col_spec.name, col_spec.type, actual_type, exc))
raise TypeError(message)
try:
col_desc = ColDesc(col_spec.keyspace_name, col_spec.table_name, col_spec.name)
uses_ce = ce_policy.contains_column(col_desc)
if uses_ce:
col_type = ce_policy.column_type(col_desc)
col_bytes = col_type.serialize(value, proto_version)
col_bytes = ce_policy.encrypt(col_desc, col_bytes)
else:
col_type = col_spec.type
col_bytes = col_type.serialize(value, proto_version)
self.values.append(col_bytes)
except (TypeError, struct.error) as exc:
actual_type = type(value)
message = ('Received an argument of invalid type for column "%s". '
'Expected: %s, Got: %s; (%s)' % (col_spec.name, col_spec.type, actual_type, exc))
raise TypeError(message)
else:
for value, col_spec in zip(values, col_meta):
if value is None:
self.values.append(None)
elif value is UNSET_VALUE:
if proto_version >= 4:
self._append_unset_value()
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
else:
try:
col_type = col_spec.type
col_bytes = col_type.serialize(value, proto_version)
self.values.append(col_bytes)
except (TypeError, struct.error) as exc:
actual_type = type(value)
message = ('Received an argument of invalid type for column "%s". '
'Expected: %s, Got: %s; (%s)' % (col_spec.name, col_spec.type, actual_type, exc))
raise TypeError(message)

if proto_version >= 4:
diff = col_meta_len - len(self.values)
Expand Down
8 changes: 6 additions & 2 deletions cassandra/row_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def make_recv_results_rows(ColumnParser colparser):
reader.buf_ptr = reader.buf
reader.pos = 0
rowcount = read_int(reader)
for i in range(rowcount):
rowparser.unpack_row(reader, desc)
if desc.column_encryption_policy:
for i in range(rowcount):
rowparser.unpack_col_encrypted_row(reader, desc)
else:
for i in range(rowcount):
rowparser.unpack_plain_row(reader, desc)

return recv_results_rows
Loading
Loading