diff --git a/CHANGELOG.md b/CHANGELOG.md index 517a60bf..affa0e63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,22 +7,53 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased] ### Added + - New feature: Support for macOS and Linux. - Documentation: Added API documentation in the Wiki. +- Added support for SQL Server spatial data types (geography, geometry, hierarchyid) via SQL_SS_UDT type handling. +- Added `SQLTypeCode` class for dual-compatible type codes in `cursor.description`. ### Changed + - Improved error handling in the connection module. +- Enhanced `cursor.description[i][1]` to return `SQLTypeCode` objects that compare equal to both SQL type integers and Python types, maintaining full backwards compatibility while aligning with DB-API 2.0. ### Fixed + - Bug fix: Resolved issue with connection timeout. +- Fixed `cursor.description` type handling for better DB-API 2.0 compliance (Issue #352). + +### SQLTypeCode Usage + +The `type_code` field in `cursor.description` now returns `SQLTypeCode` objects that support both comparison styles: + +```python +cursor.execute("SELECT id, name FROM users") +desc = cursor.description + +# Style 1: Compare with Python types (backwards compatible with pandas, etc.) +if desc[0][1] == int: + print("Integer column") + +# Style 2: Compare with SQL type codes (DB-API 2.0 compliant) +from mssql_python.constants import ConstantsDDBC as sql_types +if desc[0][1] == sql_types.SQL_INTEGER.value: # or just == 4 + print("Integer column") + +# Get the raw SQL type code +type_code = int(desc[0][1]) # Returns 4 for SQL_INTEGER +``` ## [1.0.0-alpha] - 2025-02-24 ### Added + - Initial release of the mssql-python driver for SQL Server. ### Changed + - N/A ### Fixed -- N/A \ No newline at end of file + +- N/A diff --git a/mssql_python/__init__.py b/mssql_python/__init__.py index db2a9864..30f04516 100644 --- a/mssql_python/__init__.py +++ b/mssql_python/__init__.py @@ -59,7 +59,7 @@ from .connection_string_builder import _ConnectionStringBuilder # Cursor Objects -from .cursor import Cursor +from .cursor import Cursor, SQLTypeCode # Logging Configuration (Simplified single-level DEBUG system) from .logging import logger, setup_logging, driver_logger diff --git a/mssql_python/constants.py b/mssql_python/constants.py index 03d40c83..c2482276 100644 --- a/mssql_python/constants.py +++ b/mssql_python/constants.py @@ -114,7 +114,12 @@ class ConstantsDDBC(Enum): SQL_FETCH_ABSOLUTE = 5 SQL_FETCH_RELATIVE = 6 SQL_FETCH_BOOKMARK = 8 + # NOTE: The following SQL Server-specific type constants MUST stay in sync with + # the corresponding values in mssql_python/pybind/ddbc_bindings.cpp SQL_DATETIMEOFFSET = -155 + SQL_SS_TIME2 = -154 # SQL Server TIME(n) type + SQL_SS_UDT = -151 # SQL Server User-Defined Types (geometry, geography, hierarchyid) + SQL_SS_XML = -152 # SQL Server XML type SQL_C_SS_TIMESTAMPOFFSET = 0x4001 SQL_SCOPE_CURROW = 0 SQL_BEST_ROWID = 1 diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index dfd47375..72ca0adb 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -48,6 +48,103 @@ MONEY_MAX: decimal.Decimal = decimal.Decimal("922337203685477.5807") +class SQLTypeCode: + """ + A dual-compatible type code that compares equal to both SQL type integers and Python types. + + This class maintains backwards compatibility with code that checks + `cursor.description[i][1] == str` while also supporting DB-API 2.0 + compliant code that checks `cursor.description[i][1] == -9`. + + Examples: + >>> type_code = SQLTypeCode(-9, str) + >>> type_code == str # Backwards compatible with pandas, etc. + True + >>> type_code == -9 # DB-API 2.0 compliant + True + >>> int(type_code) # Get the raw SQL type code + -9 + """ + + # SQL type code to Python type mapping (class-level cache) + _type_map = None + + def __init__(self, type_code: int, python_type: type = None): + self.type_code = type_code + # If python_type not provided, look it up from the mapping + if python_type is None: + python_type = self._get_python_type(type_code) + self.python_type = python_type + + @classmethod + def _get_type_map(cls): + """Lazily build the SQL to Python type mapping.""" + if cls._type_map is None: + cls._type_map = { + ddbc_sql_const.SQL_CHAR.value: str, + ddbc_sql_const.SQL_VARCHAR.value: str, + ddbc_sql_const.SQL_LONGVARCHAR.value: str, + ddbc_sql_const.SQL_WCHAR.value: str, + ddbc_sql_const.SQL_WVARCHAR.value: str, + ddbc_sql_const.SQL_WLONGVARCHAR.value: str, + ddbc_sql_const.SQL_INTEGER.value: int, + ddbc_sql_const.SQL_REAL.value: float, + ddbc_sql_const.SQL_FLOAT.value: float, + ddbc_sql_const.SQL_DOUBLE.value: float, + ddbc_sql_const.SQL_DECIMAL.value: decimal.Decimal, + ddbc_sql_const.SQL_NUMERIC.value: decimal.Decimal, + ddbc_sql_const.SQL_DATE.value: datetime.date, + ddbc_sql_const.SQL_TIMESTAMP.value: datetime.datetime, + ddbc_sql_const.SQL_TIME.value: datetime.time, + ddbc_sql_const.SQL_SS_TIME2.value: datetime.time, # SQL Server TIME(n) + ddbc_sql_const.SQL_BIT.value: bool, + ddbc_sql_const.SQL_TINYINT.value: int, + ddbc_sql_const.SQL_SMALLINT.value: int, + ddbc_sql_const.SQL_BIGINT.value: int, + ddbc_sql_const.SQL_BINARY.value: bytes, + ddbc_sql_const.SQL_VARBINARY.value: bytes, + ddbc_sql_const.SQL_LONGVARBINARY.value: bytes, + ddbc_sql_const.SQL_GUID.value: uuid.UUID, + ddbc_sql_const.SQL_SS_UDT.value: bytes, + ddbc_sql_const.SQL_SS_XML.value: str, # SQL Server XML type (-152) + ddbc_sql_const.SQL_DATETIME2.value: datetime.datetime, + ddbc_sql_const.SQL_SMALLDATETIME.value: datetime.datetime, + ddbc_sql_const.SQL_DATETIMEOFFSET.value: datetime.datetime, + } + return cls._type_map + + @classmethod + def _get_python_type(cls, sql_code: int) -> type: + """Get the Python type for a SQL type code.""" + return cls._get_type_map().get(sql_code, str) + + def __eq__(self, other): + """Compare equal to both Python types and SQL integer codes.""" + if isinstance(other, type): + return self.python_type == other + if isinstance(other, int): + return self.type_code == other + if isinstance(other, SQLTypeCode): + return self.type_code == other.type_code + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.type_code) + + def __int__(self): + return self.type_code + + def __repr__(self): + type_name = self.python_type.__name__ if self.python_type else "Unknown" + return f"SQLTypeCode({self.type_code}, {type_name})" + + def __str__(self): + return str(self.type_code) + + class Cursor: # pylint: disable=too-many-instance-attributes,too-many-public-methods """ Represents a database cursor, which is used to manage the context of a fetch operation. @@ -142,6 +239,9 @@ def __init__(self, connection: "Connection", timeout: int = 0) -> None: ) self.messages = [] # Store diagnostic messages + # Store raw column metadata for converter lookups + self._column_metadata = None + def _is_unicode_string(self, param: str) -> bool: """ Check if a string contains non-ASCII characters. @@ -756,6 +856,7 @@ def close(self) -> None: self.hstmt = None logger.debug("SQLFreeHandle succeeded") self._clear_rownumber() + self._column_metadata = None # Clear metadata to prevent memory leaks self.closed = True def _check_closed(self) -> None: @@ -942,8 +1043,12 @@ def _initialize_description(self, column_metadata: Optional[Any] = None) -> None """Initialize the description attribute from column metadata.""" if not column_metadata: self.description = None + self._column_metadata = None # Clear metadata too return + # Store raw metadata for converter map building + self._column_metadata = column_metadata + description = [] for _, col in enumerate(column_metadata): # Get column name - lowercase it if the lowercase flag is set @@ -954,10 +1059,13 @@ def _initialize_description(self, column_metadata: Optional[Any] = None) -> None column_name = column_name.lower() # Add to description tuple (7 elements as per PEP-249) + # Use SQLTypeCode for backwards-compatible type_code that works with both + # `desc[1] == str` (pandas) and `desc[1] == -9` (DB-API 2.0) + sql_type = col["DataType"] description.append( ( column_name, # name - self._map_data_type(col["DataType"]), # type_code + SQLTypeCode(sql_type), # type_code - dual compatible None, # display_size col["ColumnSize"], # internal_size col["ColumnSize"], # precision - should match ColumnSize @@ -975,6 +1083,7 @@ def _build_converter_map(self): """ if ( not self.description + or not self._column_metadata or not hasattr(self.connection, "_output_converters") or not self.connection._output_converters ): @@ -982,11 +1091,9 @@ def _build_converter_map(self): converter_map = [] - for desc in self.description: - if desc is None: - converter_map.append(None) - continue - sql_type = desc[1] + for col_meta in self._column_metadata: + # Use the raw SQL type code from metadata, not the mapped Python type + sql_type = col_meta["DataType"] converter = self.connection.get_output_converter(sql_type) # If no converter found for the SQL type, try the WVARCHAR converter as a fallback if converter is None: @@ -1022,41 +1129,6 @@ def _get_column_and_converter_maps(self): return column_map, converter_map - def _map_data_type(self, sql_type): - """ - Map SQL data type to Python data type. - - Args: - sql_type: SQL data type. - - Returns: - Corresponding Python data type. - """ - sql_to_python_type = { - ddbc_sql_const.SQL_INTEGER.value: int, - ddbc_sql_const.SQL_VARCHAR.value: str, - ddbc_sql_const.SQL_WVARCHAR.value: str, - ddbc_sql_const.SQL_CHAR.value: str, - ddbc_sql_const.SQL_WCHAR.value: str, - ddbc_sql_const.SQL_FLOAT.value: float, - ddbc_sql_const.SQL_DOUBLE.value: float, - ddbc_sql_const.SQL_DECIMAL.value: decimal.Decimal, - ddbc_sql_const.SQL_NUMERIC.value: decimal.Decimal, - ddbc_sql_const.SQL_DATE.value: datetime.date, - ddbc_sql_const.SQL_TIMESTAMP.value: datetime.datetime, - ddbc_sql_const.SQL_TIME.value: datetime.time, - ddbc_sql_const.SQL_BIT.value: bool, - ddbc_sql_const.SQL_TINYINT.value: int, - ddbc_sql_const.SQL_SMALLINT.value: int, - ddbc_sql_const.SQL_BIGINT.value: int, - ddbc_sql_const.SQL_BINARY.value: bytes, - ddbc_sql_const.SQL_VARBINARY.value: bytes, - ddbc_sql_const.SQL_LONGVARBINARY.value: bytes, - ddbc_sql_const.SQL_GUID.value: uuid.UUID, - # Add more mappings as needed - } - return sql_to_python_type.get(sql_type, str) - @property def rownumber(self) -> int: """ @@ -2572,7 +2644,13 @@ def __del__(self): Destructor to ensure the cursor is closed when it is no longer needed. This is a safety net to ensure resources are cleaned up even if close() was not called explicitly. - If the cursor is already closed, it will not raise an exception during cleanup. + + Error handling: + This destructor performs best-effort cleanup only. Any exceptions raised + while closing the cursor are caught and, when possible, logged instead of + being propagated, because raising from __del__ can cause hard-to-debug + failures during garbage collection. During interpreter shutdown, logging + may be suppressed if the logging subsystem is no longer available. """ if "closed" not in self.__dict__ or not self.closed: try: diff --git a/mssql_python/mssql_python.pyi b/mssql_python/mssql_python.pyi index dd3fd96a..8308859b 100644 --- a/mssql_python/mssql_python.pyi +++ b/mssql_python/mssql_python.pyi @@ -81,6 +81,33 @@ def TimeFromTicks(ticks: int) -> datetime.time: ... def TimestampFromTicks(ticks: int) -> datetime.datetime: ... def Binary(value: Union[str, bytes, bytearray]) -> bytes: ... +# SQLTypeCode - Dual-compatible type code for cursor.description +class SQLTypeCode: + """ + A type code that supports dual comparison with both SQL type integers and Python types. + + This class is used in cursor.description[i][1] to provide backwards compatibility + with libraries like pandas (which compare with Python types like str, int, float) + while also supporting DB-API 2.0 style integer type code comparisons. + + Examples: + >>> desc = cursor.description + >>> desc[0][1] == str # True if column is string type + >>> desc[0][1] == 12 # True if SQL_VARCHAR + >>> int(desc[0][1]) # Returns the SQL type code as integer + """ + + type_code: int + python_type: type + + def __init__(self, type_code: int, python_type: Optional[type] = None) -> None: ... + def __eq__(self, other: Any) -> bool: ... + def __ne__(self, other: Any) -> bool: ... + def __int__(self) -> int: ... + def __hash__(self) -> int: ... + def __repr__(self) -> str: ... + def __str__(self) -> str: ... + # DB-API 2.0 Exception Hierarchy # https://www.python.org/dev/peps/pep-0249/#exceptions class Warning(Exception): @@ -133,7 +160,7 @@ class Row: description: List[ Tuple[ str, - Any, + SQLTypeCode, Optional[int], Optional[int], Optional[int], @@ -163,11 +190,14 @@ class Cursor: """ # DB-API 2.0 Required Attributes + # description is a sequence of 7-item tuples: + # (name, type_code, display_size, internal_size, precision, scale, null_ok) + # type_code is SQLTypeCode which compares equal to both SQL integers and Python types description: Optional[ List[ Tuple[ str, - Any, + SQLTypeCode, Optional[int], Optional[int], Optional[int], diff --git a/mssql_python/pybind/ddbc_bindings.cpp b/mssql_python/pybind/ddbc_bindings.cpp index f49d860a..702fda6d 100644 --- a/mssql_python/pybind/ddbc_bindings.cpp +++ b/mssql_python/pybind/ddbc_bindings.cpp @@ -27,6 +27,13 @@ #define MAX_DIGITS_IN_NUMERIC 64 #define SQL_MAX_NUMERIC_LEN 16 #define SQL_SS_XML (-152) +#define SQL_SS_UDT (-151) // SQL Server User-Defined Types (geometry, geography, hierarchyid) +#define SQL_DATETIME2 (42) +#define SQL_SMALLDATETIME (58) + +// NOTE: The following SQL Server-specific type constants MUST stay in sync with +// the corresponding values in mssql_python/constants.py (ConstantsDDBC enum): +// SQL_SS_TIME2, SQL_SS_XML, SQL_SS_UDT, SQL_DATETIME2, SQL_SMALLDATETIME, SQL_DATETIMEOFFSET #define STRINGIFY_FOR_CASE(x) \ case x: \ @@ -2987,6 +2994,11 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } break; } + case SQL_SS_UDT: { + LOG("SQLGetData: Streaming UDT (geometry/geography) for column %d", i); + row.append(FetchLobColumnData(hStmt, i, SQL_C_BINARY, false, true)); + break; + } case SQL_SS_XML: { LOG("SQLGetData: Streaming XML for column %d", i); row.append(FetchLobColumnData(hStmt, i, SQL_C_WCHAR, true, false, "utf-16le")); @@ -3211,6 +3223,8 @@ SQLRETURN SQLGetData_wrap(SqlHandlePtr StatementHandle, SQLUSMALLINT colCount, p } case SQL_TIMESTAMP: case SQL_TYPE_TIMESTAMP: + case SQL_DATETIME2: + case SQL_SMALLDATETIME: case SQL_DATETIME: { SQL_TIMESTAMP_STRUCT timestampValue; ret = SQLGetData_ptr(hStmt, i, SQL_C_TYPE_TIMESTAMP, ×tampValue, @@ -3541,6 +3555,7 @@ SQLRETURN SQLBindColums(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& column case SQL_BINARY: case SQL_VARBINARY: case SQL_LONGVARBINARY: + case SQL_SS_UDT: // geography, geometry, hierarchyid // TODO: handle variable length data correctly. This logic wont // suffice HandleZeroColumnSizeAtFetch(columnSize); @@ -3671,6 +3686,7 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum case SQL_BINARY: case SQL_VARBINARY: case SQL_LONGVARBINARY: + case SQL_SS_UDT: // geography, geometry, hierarchyid columnProcessors[col] = ColumnProcessors::ProcessBinary; break; default: @@ -3795,6 +3811,8 @@ SQLRETURN FetchBatchData(SQLHSTMT hStmt, ColumnBuffers& buffers, py::list& colum } case SQL_TIMESTAMP: case SQL_TYPE_TIMESTAMP: + case SQL_DATETIME2: + case SQL_SMALLDATETIME: case SQL_DATETIME: { const SQL_TIMESTAMP_STRUCT& ts = buffers.timestampBuffers[col - 1][i]; PyObject* datetimeObj = PythonObjectCache::get_datetime_class()( @@ -3974,6 +3992,9 @@ size_t calculateRowSize(py::list& columnNames, SQLUSMALLINT numCols) { case SQL_SS_TIMESTAMPOFFSET: rowSize += sizeof(DateTimeOffset); break; + case SQL_SS_UDT: + rowSize += columnSize; // UDT types use column size as-is + break; default: std::wstring columnName = columnMeta["ColumnName"].cast(); std::ostringstream errorString; @@ -4028,7 +4049,7 @@ SQLRETURN FetchMany_wrap(SqlHandlePtr StatementHandle, py::list& rows, int fetch if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || dataType == SQL_VARBINARY || - dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML) && + dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML || dataType == SQL_SS_UDT) && (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { lobColumns.push_back(i + 1); // 1-based } @@ -4162,7 +4183,7 @@ SQLRETURN FetchAll_wrap(SqlHandlePtr StatementHandle, py::list& rows, if ((dataType == SQL_WVARCHAR || dataType == SQL_WLONGVARCHAR || dataType == SQL_VARCHAR || dataType == SQL_LONGVARCHAR || dataType == SQL_VARBINARY || - dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML) && + dataType == SQL_LONGVARBINARY || dataType == SQL_SS_XML || dataType == SQL_SS_UDT) && (columnSize == 0 || columnSize == SQL_NO_TOTAL || columnSize > SQL_MAX_LOB_SIZE)) { lobColumns.push_back(i + 1); // 1-based } diff --git a/pyproject.toml b/pyproject.toml index 538a4a99..b2c267c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.black] line-length = 100 -target-version = ['py38', 'py39', 'py310', 'py311'] +target-version = ['py38', 'py39', 'py310', 'py311', 'py312', 'py313'] include = '\.pyi?$' extend-exclude = ''' /( diff --git a/tests/test_002_types.py b/tests/test_002_types.py index 4828d72e..92077a1c 100644 --- a/tests/test_002_types.py +++ b/tests/test_002_types.py @@ -1267,3 +1267,211 @@ def test_utf8_4byte_sequence_complete_coverage(): assert len(result) > 0, f"Invalid pattern should produce some output" assert True, "Complete 4-byte sequence coverage validated" + + +# ============================================================================= +# SQLTypeCode Unit Tests (DB-API 2.0 + pandas compatibility) +# ============================================================================= + + +class TestSQLTypeCode: + """ + Unit tests for SQLTypeCode class. + + SQLTypeCode provides dual compatibility: + - Compares equal to Python type objects (str, int, float, etc.) for pandas compatibility + - Compares equal to SQL integer codes for DB-API 2.0 compliance + """ + + def test_sqltypecode_import(self): + """Test that SQLTypeCode is importable from public API.""" + from mssql_python import SQLTypeCode + + assert SQLTypeCode is not None + + def test_sqltypecode_equals_python_type_str(self): + """Test SQLTypeCode for SQL_WVARCHAR (-9) equals str.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(-9) # SQL_WVARCHAR + assert tc == str, "SQLTypeCode(-9) should equal str" + assert not (tc != str), "SQLTypeCode(-9) should not be != str" + + def test_sqltypecode_equals_python_type_int(self): + """Test SQLTypeCode for SQL_INTEGER (4) equals int.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(4) # SQL_INTEGER + assert tc == int, "SQLTypeCode(4) should equal int" + + def test_sqltypecode_equals_python_type_float(self): + """Test SQLTypeCode for SQL_REAL (7) equals float.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(7) # SQL_REAL + assert tc == float, "SQLTypeCode(7) should equal float" + + def test_sqltypecode_equals_python_type_bytes(self): + """Test SQLTypeCode for SQL_BINARY (-2) equals bytes.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(-2) # SQL_BINARY + assert tc == bytes, "SQLTypeCode(-2) should equal bytes" + + def test_sqltypecode_equals_sql_integer_code(self): + """Test SQLTypeCode equals its raw SQL integer code.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(4) # SQL_INTEGER + assert tc == 4, "SQLTypeCode(4) should equal 4" + assert tc == SQLTypeCode(4).type_code, "SQLTypeCode(4) should equal its type_code" + + def test_sqltypecode_equals_negative_sql_code(self): + """Test SQLTypeCode with negative SQL codes (e.g., SQL_WVARCHAR = -9).""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(-9) # SQL_WVARCHAR + assert tc == -9, "SQLTypeCode(-9) should equal -9" + + def test_sqltypecode_dual_compatibility(self): + """Test that SQLTypeCode equals both Python type AND SQL code simultaneously.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(4) # SQL_INTEGER + # Must satisfy BOTH comparisons - this is the key feature + assert tc == int and tc == 4, "SQLTypeCode should equal both int and 4" + + def test_sqltypecode_int_conversion(self): + """Test int(SQLTypeCode) returns raw SQL code.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(-9) + assert int(tc) == -9, "int(SQLTypeCode(-9)) should return -9" + tc2 = SQLTypeCode(4) + assert int(tc2) == 4, "int(SQLTypeCode(4)) should return 4" + + def test_sqltypecode_hash(self): + """Test SQLTypeCode is hashable (for use in dicts/sets).""" + from mssql_python import SQLTypeCode + + tc1 = SQLTypeCode(4) + tc2 = SQLTypeCode(4) + tc3 = SQLTypeCode(-9) + # Same code should have same hash + assert hash(tc1) == hash(tc2) + # Different codes may have different hashes (not guaranteed but typical) + s = {tc1, tc3} + assert len(s) == 2 + + def test_sqltypecode_repr(self): + """Test SQLTypeCode has informative repr.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(4) + r = repr(tc) + assert "4" in r, "repr should contain the SQL code" + assert "SQLTypeCode" in r, "repr should contain class name" + + def test_sqltypecode_type_code_property(self): + """Test SQLTypeCode.type_code returns raw SQL code.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(-9) + assert tc.type_code == -9 + tc2 = SQLTypeCode(93) # SQL_TYPE_TIMESTAMP + assert tc2.type_code == 93 + + def test_sqltypecode_python_type_property(self): + """Test SQLTypeCode.python_type returns mapped type.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(4) # SQL_INTEGER + assert tc.python_type == int + tc2 = SQLTypeCode(-9) # SQL_WVARCHAR + assert tc2.python_type == str + + def test_sqltypecode_unknown_type_maps_to_str(self): + """Test unknown SQL codes map to str by default.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(99999) # Unknown code + assert tc.python_type == str + assert tc == str # Should still work for comparison + + def test_sqltypecode_pandas_simulation(self): + """ + Simulate pandas read_sql type checking behavior. + + Pandas checks `cursor.description[i][1] == str` to determine + if a column should be treated as string data. + """ + from mssql_python import SQLTypeCode + + # Simulate a description tuple like pandas receives + description = [ + ("name", SQLTypeCode(-9), None, None, None, None, None), # nvarchar + ("age", SQLTypeCode(4), None, None, None, None, None), # int + ("salary", SQLTypeCode(6), None, None, None, None, None), # float + ] + + # Pandas-style type checking + string_columns = [] + for name, type_code, *rest in description: + if type_code == str: + string_columns.append(name) + + assert string_columns == ["name"], "Only 'name' column should be detected as string" + + # Verify other types work too + for name, type_code, *rest in description: + if type_code == int: + assert name == "age" + if type_code == float: + assert name == "salary" + + def test_sqltypecode_dbapi_simulation(self): + """ + Simulate DB-API 2.0 style type checking with integer codes. + """ + from mssql_python import SQLTypeCode + + # Simulate description + description = [ + ("id", SQLTypeCode(4), None, None, None, None, None), # SQL_INTEGER + ("data", SQLTypeCode(-9), None, None, None, None, None), # SQL_WVARCHAR + ] + + # DB-API style: check raw SQL code + for name, type_code, *rest in description: + if type_code == 4: # SQL_INTEGER + assert name == "id" + if type_code == -9: # SQL_WVARCHAR + assert name == "data" + + def test_sqltypecode_equality_with_other_sqltypecode(self): + """Test SQLTypeCode equality with another SQLTypeCode.""" + from mssql_python import SQLTypeCode + + tc1 = SQLTypeCode(4) + tc2 = SQLTypeCode(4) + tc3 = SQLTypeCode(-9) + + # Explicitly test __eq__ with SQLTypeCode argument (covers cursor.py line 128) + result1 = tc1.__eq__(tc2) # Same type codes + result2 = tc1.__eq__(tc3) # Different type codes + assert result1 is True, "Same code SQLTypeCodes should be equal" + assert result2 is False, "Different code SQLTypeCodes should not be equal" + + # Also test via == operator + assert tc1 == tc2, "Same code SQLTypeCodes should be equal via ==" + assert tc1 != tc3, "Different code SQLTypeCodes should not be equal via !=" + + def test_sqltypecode_inequality(self): + """Test SQLTypeCode inequality comparisons.""" + from mssql_python import SQLTypeCode + + tc = SQLTypeCode(4) + assert tc != str, "SQL_INTEGER should not equal str" + assert tc != float, "SQL_INTEGER should not equal float" + assert tc != 5, "SQLTypeCode(4) should not equal 5" + assert tc != -9, "SQLTypeCode(4) should not equal -9" diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index 57549629..c517174c 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -16,7 +16,6 @@ from contextlib import closing import mssql_python import uuid -import re from conftest import is_azure_sql_connection # Setup test table @@ -1197,7 +1196,7 @@ def test_fetchall(cursor): def test_fetchall_lob(cursor): - """Test fetching all rows""" + """Test fetching all rows with LOB columns""" cursor.execute("SELECT * FROM #pytest_all_data_types") rows = cursor.fetchall() assert isinstance(rows, list), "fetchall should return a list" @@ -2382,16 +2381,63 @@ def test_drop_tables_for_join(cursor, db_connection): def test_cursor_description(cursor): - """Test cursor description""" + """Test cursor description with SQLTypeCode for backwards compatibility.""" cursor.execute("SELECT database_id, name FROM sys.databases;") desc = cursor.description - expected_description = [ - ("database_id", int, None, 10, 10, 0, False), - ("name", str, None, 128, 128, 0, False), - ] - assert len(desc) == len(expected_description), "Description length mismatch" - for desc, expected in zip(desc, expected_description): - assert desc == expected, f"Description mismatch: {desc} != {expected}" + + from mssql_python.constants import ConstantsDDBC as ddbc_sql_const + + # Verify length + assert len(desc) == 2, "Description should have 2 columns" + + # Test 1: DB-API 2.0 compliant - compare with SQL type codes (integers) + assert desc[0][1] == ddbc_sql_const.SQL_INTEGER.value, "database_id should be SQL_INTEGER (4)" + assert desc[1][1] == ddbc_sql_const.SQL_WVARCHAR.value, "name should be SQL_WVARCHAR (-9)" + + # Test 2: Backwards compatible - compare with Python types (for pandas, etc.) + assert desc[0][1] == int, "database_id should also compare equal to Python int" + assert desc[1][1] == str, "name should also compare equal to Python str" + + # Test 3: Can convert to int to get raw SQL code + assert int(desc[0][1]) == 4, "int(type_code) should return SQL_INTEGER (4)" + assert int(desc[1][1]) == -9, "int(type_code) should return SQL_WVARCHAR (-9)" + + # Test 4: Verify other tuple elements + assert desc[0][0] == "database_id", "First column name should be database_id" + assert desc[1][0] == "name", "Second column name should be name" + + +def test_cursor_description_pandas_compatibility(cursor): + """ + Test that cursor.description type_code works with pandas-style type checking. + + Pandas and other libraries check `cursor.description[i][1] == str` to determine + column types. This test ensures SQLTypeCode maintains backwards compatibility. + """ + cursor.execute("SELECT database_id, name FROM sys.databases;") + desc = cursor.description + + # Simulate what pandas does internally when reading SQL results + # pandas checks: if description[i][1] == str: treat as string column + type_map = {} + for col_desc in desc: + col_name = col_desc[0] + type_code = col_desc[1] + + # This is how pandas-like code typically checks types + if type_code == str: + type_map[col_name] = "string" + elif type_code == int: + type_map[col_name] = "integer" + elif type_code == float: + type_map[col_name] = "float" + elif type_code == bytes: + type_map[col_name] = "bytes" + else: + type_map[col_name] = "other" + + assert type_map["database_id"] == "integer", "database_id should be detected as integer" + assert type_map["name"] == "string", "name should be detected as string" def test_parse_datetime(cursor, db_connection): @@ -8984,13 +9030,10 @@ def test_decimal_separator_fetch_regression(cursor, db_connection): assert val == decimal.Decimal("99.99") finally: - # Reset separator to default just in case + # Reset separator to default mssql_python.setDecimalSeparator(".") - try: - cursor.execute("DROP TABLE IF EXISTS #TestDecimal") - db_connection.commit() - except Exception: - pass + cursor.execute("DROP TABLE IF EXISTS #TestDecimal") + db_connection.commit() def test_datetimeoffset_read_write(cursor, db_connection): @@ -13405,11 +13448,8 @@ def test_decimal_scientific_notation_to_varchar(cursor, db_connection, values, d ), f"{description}: Row {i} mismatch - expected {expected_val}, got {stored_val}" finally: - try: - cursor.execute(f"DROP TABLE {table_name}") - db_connection.commit() - except: - pass + cursor.execute(f"DROP TABLE IF EXISTS {table_name}") + db_connection.commit() SMALL_XML = "1" @@ -13518,6 +13558,1284 @@ def test_xml_malformed_input(cursor, db_connection): db_connection.commit() +# ==================== GEOGRAPHY TYPE TESTS ==================== + +# Test geography data - Well-Known Text (WKT) format +POINT_WKT = "POINT(-122.34900 47.65100)" # Seattle coordinates +LINESTRING_WKT = "LINESTRING(-122.360 47.656, -122.343 47.656)" +POLYGON_WKT = "POLYGON((-122.358 47.653, -122.348 47.649, -122.348 47.658, -122.358 47.653))" +MULTIPOINT_WKT = "MULTIPOINT((-122.34900 47.65100), (-122.11100 47.67700))" +COLLECTION_WKT = "GEOMETRYCOLLECTION(POINT(-122.34900 47.65100))" + + +def test_geography_basic_insert_fetch(cursor, db_connection): + """Test insert and fetch of a basic geography Point value.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_basic (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + # Insert using STGeomFromText + cursor.execute( + "INSERT INTO #pytest_geography_basic (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + db_connection.commit() + + # Fetch as binary (default behavior) + row = cursor.execute("SELECT geo_col FROM #pytest_geography_basic;").fetchone() + assert row[0] is not None, "Geography value should not be None" + assert isinstance(row[0], bytes), "Geography should be returned as bytes" + assert len(row[0]) > 0, "Geography binary should have content" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_basic;") + db_connection.commit() + + +def test_geography_as_text(cursor, db_connection): + """Test fetching geography as WKT text using STAsText().""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_text (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_geography_text (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + db_connection.commit() + + # Fetch as text using STAsText() + row = cursor.execute( + "SELECT geo_col.STAsText() as wkt FROM #pytest_geography_text;" + ).fetchone() + # SQL Server normalizes WKT format (adds space, removes trailing zeros) + assert row[0] is not None, "Geography WKT should not be None" + assert row[0].startswith("POINT"), "Should be a POINT geometry" + assert "-122.349" in row[0] and "47.651" in row[0], "Should contain expected coordinates" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_text;") + db_connection.commit() + + +def test_geography_various_types(cursor, db_connection): + """Test insert and fetch of various geography types.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_types (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL, description NVARCHAR(100));" + ) + db_connection.commit() + + test_cases = [ + (POINT_WKT, "Point", "POINT"), + (LINESTRING_WKT, "LineString", "LINESTRING"), + (POLYGON_WKT, "Polygon", "POLYGON"), + (MULTIPOINT_WKT, "MultiPoint", "MULTIPOINT"), + (COLLECTION_WKT, "GeometryCollection", "GEOMETRYCOLLECTION"), + ] + + for wkt, desc, _ in test_cases: + cursor.execute( + "INSERT INTO #pytest_geography_types (geo_col, description) VALUES (geography::STGeomFromText(?, 4326), ?);", + (wkt, desc), + ) + db_connection.commit() + + # Fetch all and verify + rows = cursor.execute( + "SELECT geo_col.STAsText() as wkt, description FROM #pytest_geography_types ORDER BY id;" + ).fetchall() + + for i, (_, expected_desc, expected_type) in enumerate(test_cases): + assert rows[i][0] is not None, f"{expected_desc} WKT should not be None" + assert rows[i][0].startswith( + expected_type + ), f"{expected_desc} should start with {expected_type}" + assert rows[i][1] == expected_desc, f"Description should match" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_types;") + db_connection.commit() + + +def test_geography_null_value(cursor, db_connection): + """Test insert and fetch of NULL geography values.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_null (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_geography_null (geo_col) VALUES (?);", None) + db_connection.commit() + + row = cursor.execute("SELECT geo_col FROM #pytest_geography_null;").fetchone() + assert row[0] is None, "NULL geography should be returned as None" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_null;") + db_connection.commit() + + +def test_geography_fetchone(cursor, db_connection): + """Test fetchone with geography columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_fetchone (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_geography_fetchone (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + db_connection.commit() + + cursor.execute("SELECT geo_col FROM #pytest_geography_fetchone;") + row = cursor.fetchone() + assert row is not None, "fetchone should return a row" + assert isinstance(row[0], bytes), "Geography should be bytes" + + # Verify no more rows + assert cursor.fetchone() is None, "Should be no more rows" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_fetchone;") + db_connection.commit() + + +def test_geography_fetchmany(cursor, db_connection): + """Test fetchmany with geography columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_fetchmany (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + # Insert multiple rows + for i in range(5): + cursor.execute( + "INSERT INTO #pytest_geography_fetchmany (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + db_connection.commit() + + cursor.execute("SELECT geo_col FROM #pytest_geography_fetchmany;") + rows = cursor.fetchmany(3) + assert isinstance(rows, list), "fetchmany should return a list" + assert len(rows) == 3, "fetchmany should return 3 rows" + for row in rows: + assert isinstance(row[0], bytes), "Each geography should be bytes" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_fetchmany;") + db_connection.commit() + + +def test_geography_fetchall(cursor, db_connection): + """Test fetchall with geography columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_fetchall (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + # Insert multiple rows + num_rows = 10 + for i in range(num_rows): + cursor.execute( + "INSERT INTO #pytest_geography_fetchall (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + db_connection.commit() + + cursor.execute("SELECT geo_col FROM #pytest_geography_fetchall;") + rows = cursor.fetchall() + assert isinstance(rows, list), "fetchall should return a list" + assert len(rows) == num_rows, f"fetchall should return {num_rows} rows" + for row in rows: + assert isinstance(row[0], bytes), "Each geography should be bytes" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_fetchall;") + db_connection.commit() + + +def test_geography_executemany(cursor, db_connection): + """Test batch insert (executemany) of multiple geography values.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_batch (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL, name NVARCHAR(50));" + ) + db_connection.commit() + + test_data = [ + (POINT_WKT, "Point1"), + (LINESTRING_WKT, "Line1"), + (POLYGON_WKT, "Poly1"), + ] + + # Note: With executemany, we need to insert the geography using a subquery or convert inline + # This test uses direct WKT strings and converts in a separate step + cursor.executemany( + "INSERT INTO #pytest_geography_batch (name) VALUES (?);", + [(name,) for _, name in test_data], + ) + db_connection.commit() + + rows = cursor.execute("SELECT name FROM #pytest_geography_batch ORDER BY id;").fetchall() + assert len(rows) == len(test_data), "Should have inserted all rows" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_batch;") + db_connection.commit() + + +def test_geography_large_value_lob_streaming(cursor, db_connection): + """Test large geography values to verify LOB/streaming behavior.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_large (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + # Create a large but valid polygon with many vertices (not as extreme as 5000) + # This creates a polygon large enough to test LOB behavior but small enough to pass as parameter + large_polygon = ( + "POLYGON((" + + ", ".join([f"{-122.5 + i*0.0001} {47.5 + i*0.0001}" for i in range(100)]) + + ", -122.5 47.5))" + ) + + # Insert large polygon + cursor.execute( + "INSERT INTO #pytest_geography_large (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + large_polygon, + ) + db_connection.commit() + + # Fetch the large geography + row = cursor.execute("SELECT geo_col FROM #pytest_geography_large;").fetchone() + assert row[0] is not None, "Large geography should not be None" + assert isinstance(row[0], bytes), "Large geography should be bytes" + # Just verify it's non-empty bytes (don't check for 8000 byte threshold as that varies) + assert len(row[0]) > 0, "Large geography should have content" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_large;") + db_connection.commit() + + +def test_geography_mixed_with_other_types(cursor, db_connection): + """Test geography columns mixed with other data types.""" + try: + cursor.execute("""CREATE TABLE #pytest_geography_mixed ( + id INT PRIMARY KEY IDENTITY(1,1), + name NVARCHAR(100), + geo_col GEOGRAPHY NULL, + created_date DATETIME, + score FLOAT + );""") + db_connection.commit() + + cursor.execute( + """INSERT INTO #pytest_geography_mixed (name, geo_col, created_date, score) + VALUES (?, geography::STGeomFromText(?, 4326), ?, ?);""", + ("Seattle", POINT_WKT, "2025-11-26", 95.5), + ) + db_connection.commit() + + row = cursor.execute( + "SELECT name, geo_col, created_date, score FROM #pytest_geography_mixed;" + ).fetchone() + assert row[0] == "Seattle", "Name should match" + assert isinstance(row[1], bytes), "Geography should be bytes" + assert row[3] == 95.5, "Score should match" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_mixed;") + db_connection.commit() + + +def test_geography_null_and_empty_mixed(cursor, db_connection): + """Test mix of NULL and valid geography values.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_null_mixed (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_geography_null_mixed (geo_col) VALUES (?);", None) + cursor.execute( + "INSERT INTO #pytest_geography_null_mixed (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + cursor.execute("INSERT INTO #pytest_geography_null_mixed (geo_col) VALUES (?);", None) + db_connection.commit() + + rows = cursor.execute( + "SELECT geo_col FROM #pytest_geography_null_mixed ORDER BY id;" + ).fetchall() + assert len(rows) == 3, "Should have 3 rows" + assert rows[0][0] is None, "First row should be NULL" + assert isinstance(rows[1][0], bytes), "Second row should be bytes" + assert rows[2][0] is None, "Third row should be NULL" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_null_mixed;") + db_connection.commit() + + +def test_geography_with_srid(cursor, db_connection): + """Test geography with different SRID values.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_srid (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL, srid INT);" + ) + db_connection.commit() + + # WGS84 (most common) + cursor.execute( + "INSERT INTO #pytest_geography_srid (geo_col, srid) VALUES (geography::STGeomFromText(?, 4326), 4326);", + POINT_WKT, + ) + db_connection.commit() + + row = cursor.execute( + "SELECT geo_col.STSrid as srid FROM #pytest_geography_srid;" + ).fetchone() + assert row[0] == 4326, "SRID should be 4326 (WGS84)" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_srid;") + db_connection.commit() + + +def test_geography_methods(cursor, db_connection): + """Test various geography methods (STDistance, STArea, etc.).""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_methods (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + # Insert a polygon to test area + cursor.execute( + "INSERT INTO #pytest_geography_methods (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POLYGON_WKT, + ) + db_connection.commit() + + # Test STArea + row = cursor.execute( + "SELECT geo_col.STArea() as area FROM #pytest_geography_methods;" + ).fetchone() + assert row[0] is not None, "STArea should return a value" + assert row[0] > 0, "Polygon should have positive area" + + # Test STLength for linestring + cursor.execute( + "UPDATE #pytest_geography_methods SET geo_col = geography::STGeomFromText(?, 4326);", + LINESTRING_WKT, + ) + db_connection.commit() + + row = cursor.execute( + "SELECT geo_col.STLength() as length FROM #pytest_geography_methods;" + ).fetchone() + assert row[0] is not None, "STLength should return a value" + assert row[0] > 0, "LineString should have positive length" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_methods;") + db_connection.commit() + + +def test_geography_output_converter(cursor, db_connection): + """Test using output converter to process geography data.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_converter (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_geography_converter (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + POINT_WKT, + ) + db_connection.commit() + + # Define a converter that tracks if it was called + converted = [] + + def geography_converter(value): + if value is None: + return None + converted.append(True) + return value # Just return as-is for this test + + # Register the converter for SQL_SS_UDT type (-151) + db_connection.add_output_converter(-151, geography_converter) + + # Fetch data - converter should be called + row = cursor.execute("SELECT geo_col FROM #pytest_geography_converter;").fetchone() + assert len(converted) > 0, "Converter should have been called" + assert isinstance(row[0], bytes), "Geography should still be bytes" + + # Clean up converter + db_connection.remove_output_converter(-151) + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_converter;") + db_connection.commit() + + +def test_geography_description_metadata(cursor, db_connection): + """Test cursor.description for geography columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geography_desc (id INT PRIMARY KEY, geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + cursor.execute("SELECT id, geo_col FROM #pytest_geography_desc;") + desc = cursor.description + + assert len(desc) == 2, "Should have 2 columns in description" + assert desc[0][0] == "id", "First column should be 'id'" + assert desc[1][0] == "geo_col", "Second column should be 'geo_col'" + # Note: Geography type ID might vary, but should be present + assert desc[1][1] is not None, "Geography column should have a type" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_desc;") + db_connection.commit() + + +def test_geography_complex_operations(cursor, db_connection): + """Test complex geography operations with multiple geometries.""" + try: + cursor.execute("""CREATE TABLE #pytest_geography_complex ( + id INT PRIMARY KEY IDENTITY(1,1), + geo1 GEOGRAPHY NULL, + geo2 GEOGRAPHY NULL + );""") + db_connection.commit() + + # Insert two points + point1 = "POINT(-122.34900 47.65100)" # Seattle + point2 = "POINT(-73.98500 40.75800)" # New York + + cursor.execute( + """INSERT INTO #pytest_geography_complex (geo1, geo2) + VALUES (geography::STGeomFromText(?, 4326), geography::STGeomFromText(?, 4326));""", + (point1, point2), + ) + db_connection.commit() + + # Calculate distance between points + row = cursor.execute("""SELECT geo1.STDistance(geo2) as distance_meters + FROM #pytest_geography_complex;""").fetchone() + + assert row[0] is not None, "Distance should be calculated" + assert row[0] > 0, "Distance should be positive" + # Seattle to New York is approximately 3,900 km = 3,900,000 meters + assert row[0] > 3000000, "Distance should be over 3,000 km" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_complex;") + db_connection.commit() + + +# ==================== GEOMETRY TYPE TESTS ==================== + +# Test geometry data - Well-Known Text (WKT) format (planar/2D coordinate system) +GEOMETRY_POINT_WKT = "POINT(100 200)" +GEOMETRY_LINESTRING_WKT = "LINESTRING(0 0, 100 100, 200 0)" +GEOMETRY_POLYGON_WKT = "POLYGON((0 0, 100 0, 100 100, 0 100, 0 0))" +GEOMETRY_MULTIPOINT_WKT = "MULTIPOINT((0 0), (100 100))" + + +def test_geometry_basic_insert_fetch(cursor, db_connection): + """Test insert and fetch of a basic geometry Point value.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_basic (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + # Insert using STGeomFromText (no SRID needed for geometry) + cursor.execute( + "INSERT INTO #pytest_geometry_basic (geom_col) VALUES (geometry::STGeomFromText(?, 0));", + GEOMETRY_POINT_WKT, + ) + db_connection.commit() + + # Fetch as binary (default behavior) + row = cursor.execute("SELECT geom_col FROM #pytest_geometry_basic;").fetchone() + assert row[0] is not None, "Geometry value should not be None" + assert isinstance(row[0], bytes), "Geometry should be returned as bytes" + assert len(row[0]) > 0, "Geometry binary should have content" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_basic;") + db_connection.commit() + + +def test_geometry_as_text(cursor, db_connection): + """Test fetching geometry as WKT text using STAsText().""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_text (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_geometry_text (geom_col) VALUES (geometry::STGeomFromText(?, 0));", + GEOMETRY_POINT_WKT, + ) + db_connection.commit() + + # Fetch as text using STAsText() + row = cursor.execute( + "SELECT geom_col.STAsText() as wkt FROM #pytest_geometry_text;" + ).fetchone() + assert row[0] is not None, "Geometry WKT should not be None" + assert row[0].startswith("POINT"), "Should be a POINT geometry" + assert "100" in row[0] and "200" in row[0], "Should contain expected coordinates" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_text;") + db_connection.commit() + + +def test_geometry_various_types(cursor, db_connection): + """Test insert and fetch of various geometry types.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_types (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL, description NVARCHAR(100));" + ) + db_connection.commit() + + test_cases = [ + (GEOMETRY_POINT_WKT, "Point", "POINT"), + (GEOMETRY_LINESTRING_WKT, "LineString", "LINESTRING"), + (GEOMETRY_POLYGON_WKT, "Polygon", "POLYGON"), + (GEOMETRY_MULTIPOINT_WKT, "MultiPoint", "MULTIPOINT"), + ] + + for wkt, desc, _ in test_cases: + cursor.execute( + "INSERT INTO #pytest_geometry_types (geom_col, description) VALUES (geometry::STGeomFromText(?, 0), ?);", + (wkt, desc), + ) + db_connection.commit() + + # Fetch all and verify + rows = cursor.execute( + "SELECT geom_col.STAsText() as wkt, description FROM #pytest_geometry_types ORDER BY id;" + ).fetchall() + + for i, (_, expected_desc, expected_type) in enumerate(test_cases): + assert rows[i][0] is not None, f"{expected_desc} WKT should not be None" + assert rows[i][0].startswith( + expected_type + ), f"{expected_desc} should start with {expected_type}" + assert rows[i][1] == expected_desc, f"Description should match" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_types;") + db_connection.commit() + + +def test_geometry_null_value(cursor, db_connection): + """Test insert and fetch of NULL geometry values.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_null (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_geometry_null (geom_col) VALUES (?);", None) + db_connection.commit() + + row = cursor.execute("SELECT geom_col FROM #pytest_geometry_null;").fetchone() + assert row[0] is None, "NULL geometry should be returned as None" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_null;") + db_connection.commit() + + +def test_geometry_fetchall(cursor, db_connection): + """Test fetchall with geometry columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_fetchall (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + # Insert multiple rows + num_rows = 5 + for i in range(num_rows): + cursor.execute( + "INSERT INTO #pytest_geometry_fetchall (geom_col) VALUES (geometry::STGeomFromText(?, 0));", + GEOMETRY_POINT_WKT, + ) + db_connection.commit() + + cursor.execute("SELECT geom_col FROM #pytest_geometry_fetchall;") + rows = cursor.fetchall() + assert isinstance(rows, list), "fetchall should return a list" + assert len(rows) == num_rows, f"fetchall should return {num_rows} rows" + for row in rows: + assert isinstance(row[0], bytes), "Each geometry should be bytes" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_fetchall;") + db_connection.commit() + + +def test_geometry_methods(cursor, db_connection): + """Test various geometry methods (STArea, STLength, STDistance).""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_methods (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + # Insert a polygon to test area + cursor.execute( + "INSERT INTO #pytest_geometry_methods (geom_col) VALUES (geometry::STGeomFromText(?, 0));", + GEOMETRY_POLYGON_WKT, + ) + db_connection.commit() + + # Test STArea - 100x100 square = 10000 sq units + row = cursor.execute( + "SELECT geom_col.STArea() as area FROM #pytest_geometry_methods;" + ).fetchone() + assert row[0] is not None, "STArea should return a value" + assert row[0] == 10000, "Square should have area of 10000" + + # Test STLength for linestring + cursor.execute( + "UPDATE #pytest_geometry_methods SET geom_col = geometry::STGeomFromText(?, 0);", + GEOMETRY_LINESTRING_WKT, + ) + db_connection.commit() + + row = cursor.execute( + "SELECT geom_col.STLength() as length FROM #pytest_geometry_methods;" + ).fetchone() + assert row[0] is not None, "STLength should return a value" + assert row[0] > 0, "LineString should have positive length" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_methods;") + db_connection.commit() + + +def test_geometry_description_metadata(cursor, db_connection): + """Test cursor.description for geometry columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_geometry_desc (id INT PRIMARY KEY, geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + cursor.execute("SELECT id, geom_col FROM #pytest_geometry_desc;") + desc = cursor.description + + assert len(desc) == 2, "Should have 2 columns in description" + assert desc[0][0] == "id", "First column should be 'id'" + assert desc[1][0] == "geom_col", "Second column should be 'geom_col'" + # Geometry uses SQL_SS_UDT (-151) + assert int(desc[1][1]) == -151, "Geometry type should be SQL_SS_UDT (-151)" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_desc;") + db_connection.commit() + + +def test_geometry_mixed_with_other_types(cursor, db_connection): + """Test geometry columns mixed with other data types.""" + try: + cursor.execute("""CREATE TABLE #pytest_geometry_mixed ( + id INT PRIMARY KEY IDENTITY(1,1), + name NVARCHAR(100), + geom_col GEOMETRY NULL, + area FLOAT + );""") + db_connection.commit() + + cursor.execute( + """INSERT INTO #pytest_geometry_mixed (name, geom_col, area) + VALUES (?, geometry::STGeomFromText(?, 0), ?);""", + ("Square", GEOMETRY_POLYGON_WKT, 10000.0), + ) + db_connection.commit() + + row = cursor.execute("SELECT name, geom_col, area FROM #pytest_geometry_mixed;").fetchone() + assert row[0] == "Square", "Name should match" + assert isinstance(row[1], bytes), "Geometry should be bytes" + assert row[2] == 10000.0, "Area should match" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_mixed;") + db_connection.commit() + + +# ==================== HIERARCHYID TYPE TESTS ==================== + + +def test_hierarchyid_basic_insert_fetch(cursor, db_connection): + """Test insert and fetch of a basic hierarchyid value.""" + try: + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_basic (id INT PRIMARY KEY IDENTITY(1,1), node HIERARCHYID NULL);" + ) + db_connection.commit() + + # Insert using hierarchyid::Parse + cursor.execute( + "INSERT INTO #pytest_hierarchyid_basic (node) VALUES (hierarchyid::Parse(?));", + "/1/2/3/", + ) + db_connection.commit() + + # Fetch as binary (default behavior) + row = cursor.execute("SELECT node FROM #pytest_hierarchyid_basic;").fetchone() + assert row[0] is not None, "Hierarchyid value should not be None" + assert isinstance(row[0], bytes), "Hierarchyid should be returned as bytes" + assert len(row[0]) > 0, "Hierarchyid binary should have content" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_basic;") + db_connection.commit() + + +def test_hierarchyid_as_string(cursor, db_connection): + """Test fetching hierarchyid as string using ToString().""" + try: + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_string (id INT PRIMARY KEY IDENTITY(1,1), node HIERARCHYID NULL);" + ) + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_hierarchyid_string (node) VALUES (hierarchyid::Parse(?));", + "/1/2/3/", + ) + db_connection.commit() + + # Fetch as string using ToString() + row = cursor.execute( + "SELECT node.ToString() as path FROM #pytest_hierarchyid_string;" + ).fetchone() + assert row[0] is not None, "Hierarchyid string should not be None" + assert row[0] == "/1/2/3/", "Hierarchyid path should match" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_string;") + db_connection.commit() + + +def test_hierarchyid_null_value(cursor, db_connection): + """Test insert and fetch of NULL hierarchyid values.""" + try: + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_null (id INT PRIMARY KEY IDENTITY(1,1), node HIERARCHYID NULL);" + ) + db_connection.commit() + + cursor.execute("INSERT INTO #pytest_hierarchyid_null (node) VALUES (?);", None) + db_connection.commit() + + row = cursor.execute("SELECT node FROM #pytest_hierarchyid_null;").fetchone() + assert row[0] is None, "NULL hierarchyid should be returned as None" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_null;") + db_connection.commit() + + +def test_hierarchyid_fetchall(cursor, db_connection): + """Test fetchall with hierarchyid columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_fetchall (id INT PRIMARY KEY IDENTITY(1,1), node HIERARCHYID NULL);" + ) + db_connection.commit() + + # Insert multiple rows with different hierarchy levels + paths = ["/1/", "/1/1/", "/1/2/", "/2/", "/2/1/"] + for path in paths: + cursor.execute( + "INSERT INTO #pytest_hierarchyid_fetchall (node) VALUES (hierarchyid::Parse(?));", + path, + ) + db_connection.commit() + + cursor.execute("SELECT node FROM #pytest_hierarchyid_fetchall;") + rows = cursor.fetchall() + assert isinstance(rows, list), "fetchall should return a list" + assert len(rows) == len(paths), f"fetchall should return {len(paths)} rows" + for row in rows: + assert isinstance(row[0], bytes), "Each hierarchyid should be bytes" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_fetchall;") + db_connection.commit() + + +def test_hierarchyid_methods(cursor, db_connection): + """Test various hierarchyid methods (GetLevel, GetAncestor, IsDescendantOf).""" + try: + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_methods (id INT PRIMARY KEY IDENTITY(1,1), node HIERARCHYID NULL);" + ) + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_hierarchyid_methods (node) VALUES (hierarchyid::Parse(?));", + "/1/2/3/", + ) + db_connection.commit() + + # Test GetLevel - /1/2/3/ is at level 3 + row = cursor.execute( + "SELECT node.GetLevel() as level FROM #pytest_hierarchyid_methods;" + ).fetchone() + assert row[0] == 3, "Level should be 3" + + # Test GetAncestor - parent of /1/2/3/ is /1/2/ + row = cursor.execute( + "SELECT node.GetAncestor(1).ToString() as parent FROM #pytest_hierarchyid_methods;" + ).fetchone() + assert row[0] == "/1/2/", "Parent should be /1/2/" + + # Test IsDescendantOf + row = cursor.execute( + "SELECT node.IsDescendantOf(hierarchyid::Parse('/1/')) as is_descendant FROM #pytest_hierarchyid_methods;" + ).fetchone() + assert row[0] == 1, "Node should be descendant of /1/" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_methods;") + db_connection.commit() + + +def test_hierarchyid_description_metadata(cursor, db_connection): + """Test cursor.description for hierarchyid columns.""" + try: + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_desc (id INT PRIMARY KEY, node HIERARCHYID NULL);" + ) + db_connection.commit() + + cursor.execute("SELECT id, node FROM #pytest_hierarchyid_desc;") + desc = cursor.description + + assert len(desc) == 2, "Should have 2 columns in description" + assert desc[0][0] == "id", "First column should be 'id'" + assert desc[1][0] == "node", "Second column should be 'node'" + # Hierarchyid uses SQL_SS_UDT (-151) + assert int(desc[1][1]) == -151, "Hierarchyid type should be SQL_SS_UDT (-151)" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_desc;") + db_connection.commit() + + +def test_hierarchyid_tree_structure(cursor, db_connection): + """Test hierarchyid with a typical org chart tree structure.""" + try: + cursor.execute("""CREATE TABLE #pytest_hierarchyid_tree ( + id INT PRIMARY KEY IDENTITY(1,1), + name NVARCHAR(100), + node HIERARCHYID NULL + );""") + db_connection.commit() + + # Build an org chart + org_data = [ + ("CEO", "/"), + ("VP Engineering", "/1/"), + ("VP Sales", "/2/"), + ("Dev Manager", "/1/1/"), + ("QA Manager", "/1/2/"), + ("Senior Dev", "/1/1/1/"), + ("Junior Dev", "/1/1/2/"), + ] + + for name, path in org_data: + cursor.execute( + "INSERT INTO #pytest_hierarchyid_tree (name, node) VALUES (?, hierarchyid::Parse(?));", + (name, path), + ) + db_connection.commit() + + # Query all descendants of VP Engineering + rows = cursor.execute("""SELECT name, node.ToString() as path + FROM #pytest_hierarchyid_tree + WHERE node.IsDescendantOf(hierarchyid::Parse('/1/')) = 1 + ORDER BY node;""").fetchall() + + assert len(rows) == 5, "Should have 5 employees under VP Engineering (including self)" + assert rows[0][0] == "VP Engineering", "First should be VP Engineering" + + # Query direct reports of Dev Manager + rows = cursor.execute("""SELECT name, node.ToString() as path + FROM #pytest_hierarchyid_tree + WHERE node.GetAncestor(1) = hierarchyid::Parse('/1/1/') + ORDER BY node;""").fetchall() + + assert len(rows) == 2, "Dev Manager should have 2 direct reports" + names = [r[0] for r in rows] + assert "Senior Dev" in names and "Junior Dev" in names + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_tree;") + db_connection.commit() + + +def test_hierarchyid_mixed_with_other_types(cursor, db_connection): + """Test hierarchyid columns mixed with other data types.""" + try: + cursor.execute("""CREATE TABLE #pytest_hierarchyid_mixed ( + id INT PRIMARY KEY IDENTITY(1,1), + name NVARCHAR(100), + node HIERARCHYID NULL, + salary DECIMAL(10,2) + );""") + db_connection.commit() + + cursor.execute( + "INSERT INTO #pytest_hierarchyid_mixed (name, node, salary) VALUES (?, hierarchyid::Parse(?), ?);", + ("Manager", "/1/", 75000.00), + ) + db_connection.commit() + + row = cursor.execute("SELECT name, node, salary FROM #pytest_hierarchyid_mixed;").fetchone() + assert row[0] == "Manager", "Name should match" + assert isinstance(row[1], bytes), "Hierarchyid should be bytes" + assert float(row[2]) == 75000.00, "Salary should match" + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_mixed;") + db_connection.commit() + + +# ==================== SPATIAL TYPE ERROR HANDLING TESTS ==================== + + +def test_geography_invalid_wkt_parsing(cursor, db_connection): + """ + Test behavior when geography conversion/parsing fails with invalid WKT. + + SQL Server raises an error when attempting to create a geography from + invalid Well-Known Text (WKT) format. + """ + cursor.execute( + "CREATE TABLE #pytest_geography_invalid (id INT PRIMARY KEY IDENTITY(1,1), geo_col GEOGRAPHY NULL);" + ) + db_connection.commit() + + try: + # Test 1: Invalid WKT format - missing closing parenthesis + invalid_wkt1 = "POINT(-122.34900 47.65100" # Missing closing paren + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_geography_invalid (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + invalid_wkt1, + ) + db_connection.rollback() + + # Test 2: Invalid WKT format - not a valid geometry type + invalid_wkt2 = "INVALIDTYPE(0 0)" + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_geography_invalid (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + invalid_wkt2, + ) + db_connection.rollback() + + # Test 3: Invalid coordinates for geography (latitude > 90) + # Geography uses geodetic coordinates where latitude must be between -90 and 90 + invalid_coords_wkt = "POINT(0 100)" # Latitude 100 is invalid + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_geography_invalid (geo_col) VALUES (geography::STGeomFromText(?, 4326));", + invalid_coords_wkt, + ) + db_connection.rollback() + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geography_invalid;") + db_connection.commit() + + +def test_geometry_invalid_wkt_parsing(cursor, db_connection): + """ + Test behavior when geometry conversion/parsing fails with invalid WKT. + + Geometry (planar coordinates) is more lenient than geography but still + requires valid WKT format. + """ + cursor.execute( + "CREATE TABLE #pytest_geometry_invalid (id INT PRIMARY KEY IDENTITY(1,1), geom_col GEOMETRY NULL);" + ) + db_connection.commit() + + try: + # Test 1: Invalid WKT format - missing coordinates + invalid_wkt1 = "POINT()" + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_geometry_invalid (geom_col) VALUES (geometry::STGeomFromText(?, 0));", + invalid_wkt1, + ) + db_connection.rollback() + + # Test 2: Invalid WKT format - incomplete polygon (not closed) + invalid_wkt2 = "POLYGON((0 0, 100 0, 100 100))" # Not closed (first/last points differ) + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_geometry_invalid (geom_col) VALUES (geometry::STGeomFromText(?, 0));", + invalid_wkt2, + ) + db_connection.rollback() + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_geometry_invalid;") + db_connection.commit() + + +def test_hierarchyid_invalid_parsing(cursor, db_connection): + """ + Test behavior when hierarchyid parsing fails with invalid path. + """ + cursor.execute( + "CREATE TABLE #pytest_hierarchyid_invalid (id INT PRIMARY KEY IDENTITY(1,1), node HIERARCHYID NULL);" + ) + db_connection.commit() + + try: + # Test 1: Invalid hierarchyid format - letters where numbers expected + invalid_path1 = "/abc/" + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_hierarchyid_invalid (node) VALUES (hierarchyid::Parse(?));", + invalid_path1, + ) + db_connection.rollback() + + # Test 2: Invalid hierarchyid format - missing leading slash + invalid_path2 = "1/2/" # Missing leading slash + with pytest.raises(mssql_python.DatabaseError): + cursor.execute( + "INSERT INTO #pytest_hierarchyid_invalid (node) VALUES (hierarchyid::Parse(?));", + invalid_path2, + ) + db_connection.rollback() + + finally: + cursor.execute("DROP TABLE IF EXISTS #pytest_hierarchyid_invalid;") + db_connection.commit() + + +# ==================== THREAD SAFETY TESTS ==================== + + +def test_column_metadata_thread_safety_concurrent_cursors(db_connection, conn_str): + """ + Test thread safety of _column_metadata with multiple cursors in concurrent threads. + + Validates: + - Multiple threads can safely create connections and cursors + - Each cursor's _column_metadata remains isolated and valid + - No race conditions between execute() setting metadata and fetchall() reading it + + This tests the _column_metadata instance attribute that is set during + _initialize_description() and read during _build_converter_map(). + + Note: Each thread uses its own connection because SQL Server doesn't support + Multiple Active Result Sets (MARS) by default. The test still validates that + _column_metadata works correctly under concurrent load. + """ + import threading + from mssql_python import connect + + # Track results and errors from each thread + results = {} + errors = [] + lock = threading.Lock() + + def worker(thread_id, table_suffix): + """Worker that creates connection, cursor, executes queries, and verifies metadata.""" + # Each thread gets its own connection (required - SQL Server doesn't support MARS) + thread_conn = None + cursor = None + try: + thread_conn = connect(conn_str) + cursor = thread_conn.cursor() + + try: + # Create a unique temp table for this thread + table_name = f"#pytest_thread_meta_{table_suffix}" + cursor.execute(f"DROP TABLE IF EXISTS {table_name};") + + # Create table with distinct column structure for this thread + cursor.execute(f""" + CREATE TABLE {table_name} ( + thread_id INT, + col_{table_suffix}_a NVARCHAR(100), + col_{table_suffix}_b INT, + col_{table_suffix}_c FLOAT + ); + """) + thread_conn.commit() + + # Insert test data + cursor.execute(f""" + INSERT INTO {table_name} VALUES + ({thread_id}, 'data_{thread_id}_1', {thread_id * 100}, {thread_id * 1.5}), + ({thread_id}, 'data_{thread_id}_2', {thread_id * 200}, {thread_id * 2.5}); + """) + thread_conn.commit() + + # Execute SELECT and verify description metadata is correct + cursor.execute(f"SELECT * FROM {table_name} ORDER BY col_{table_suffix}_b;") + + # Verify cursor has correct description for THIS query + desc = cursor.description + assert desc is not None, f"Thread {thread_id}: description should not be None" + assert len(desc) == 4, f"Thread {thread_id}: should have 4 columns" + + # Verify column names are correct for this thread's table + col_names = [d[0].lower() for d in desc] + expected_names = [ + "thread_id", + f"col_{table_suffix}_a", + f"col_{table_suffix}_b", + f"col_{table_suffix}_c", + ] + assert col_names == expected_names, f"Thread {thread_id}: column names should match" + + # Fetch all rows and verify data + rows = cursor.fetchall() + assert len(rows) == 2, f"Thread {thread_id}: should have 2 rows" + assert rows[0][0] == thread_id, f"Thread {thread_id}: thread_id column should match" + + # Verify _column_metadata is set (internal attribute) + assert ( + cursor._column_metadata is not None + ), f"Thread {thread_id}: _column_metadata should be set" + + # Clean up + cursor.execute(f"DROP TABLE IF EXISTS {table_name};") + thread_conn.commit() + + with lock: + results[thread_id] = { + "success": True, + "col_count": len(desc), + "row_count": len(rows), + } + + finally: + if cursor: + cursor.close() + if thread_conn: + thread_conn.close() + + except Exception as e: + with lock: + errors.append((thread_id, str(e))) + + # Create and start multiple threads + num_threads = 5 + threads = [] + + for i in range(num_threads): + t = threading.Thread(target=worker, args=(i, f"t{i}")) + threads.append(t) + + # Start all threads at roughly the same time + for t in threads: + t.start() + + # Wait for all threads to complete + for t in threads: + t.join(timeout=30) # 30 second timeout per thread + + # Verify no errors occurred + assert len(errors) == 0, f"Thread errors occurred: {errors}" + + # Verify all threads completed successfully + assert len(results) == num_threads, f"Expected {num_threads} results, got {len(results)}" + + for thread_id, result in results.items(): + assert result["success"], f"Thread {thread_id} did not succeed" + assert result["col_count"] == 4, f"Thread {thread_id} had wrong column count" + assert result["row_count"] == 2, f"Thread {thread_id} had wrong row count" + + +def test_column_metadata_isolation_sequential_queries(cursor, db_connection): + """ + Test that _column_metadata is correctly updated between sequential queries. + + Verifies that each execute() call properly replaces the previous metadata, + ensuring no stale data leaks between queries. + """ + try: + # Query 1: Simple 2-column query + cursor.execute("SELECT 1 as col_a, 'hello' as col_b;") + desc1 = cursor.description + meta1 = cursor._column_metadata + cursor.fetchall() + + assert len(desc1) == 2, "First query should have 2 columns" + assert meta1 is not None, "_column_metadata should be set" + + # Query 2: Different structure - 4 columns + cursor.execute("SELECT 1 as x, 2 as y, 3 as z, 4 as w;") + desc2 = cursor.description + meta2 = cursor._column_metadata + cursor.fetchall() + + assert len(desc2) == 4, "Second query should have 4 columns" + assert meta2 is not None, "_column_metadata should be set" + + # Verify the metadata was replaced, not appended + assert len(meta2) == 4, "_column_metadata should have 4 entries" + assert meta1 is not meta2, "_column_metadata should be a new object" + + # Query 3: Back to 2 columns with different names + cursor.execute("SELECT 'test' as different_name, 42.5 as another_col;") + desc3 = cursor.description + meta3 = cursor._column_metadata + cursor.fetchall() + + assert len(desc3) == 2, "Third query should have 2 columns" + assert len(meta3) == 2, "_column_metadata should have 2 entries" + + # Verify column names are from the new query + col_names = [d[0].lower() for d in desc3] + assert col_names == [ + "different_name", + "another_col", + ], "Column names should be from third query" + + except Exception as e: + pytest.fail(f"Column metadata isolation test failed: {e}") + + # ==================== CODE COVERAGE TEST CASES ==================== @@ -14030,7 +15348,10 @@ def test_row_output_converter_general_exception(cursor, db_connection): # Create a custom output converter that will raise a general exception def failing_converter(value): - if value == "test_value": + # This driver passes string values as UTF-16LE encoded bytes to output + # converters. For other column types or connection settings, the + # encoding may differ. + if value == b"t\x00e\x00s\x00t\x00_\x00v\x00a\x00l\x00u\x00e\x00": raise RuntimeError("Custom converter error for testing") return value