diff --git a/main.py b/main.py index 2f8cf28c..7e56b2fe 100644 --- a/main.py +++ b/main.py @@ -15,4 +15,4 @@ print(f"Database ID: {row[0]}, Name: {row[1]}") cursor.close() -conn.close() \ No newline at end of file +conn.close() diff --git a/mssql_python/cursor.py b/mssql_python/cursor.py index dfd47375..222b265a 100644 --- a/mssql_python/cursor.py +++ b/mssql_python/cursor.py @@ -15,7 +15,7 @@ import uuid import datetime import warnings -from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING +from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING, Iterable from mssql_python.constants import ConstantsDDBC as ddbc_sql_const, SQLTypes from mssql_python.helpers import check_error from mssql_python.logging import logger @@ -2451,6 +2451,191 @@ def nextset(self) -> Union[bool, None]: ) return True + def _bulkcopy( + self, table_name: str, data: Iterable[Union[Tuple, List]], **kwargs + ): # pragma: no cover + """ + Perform bulk copy operation for high-performance data loading. + + Args: + table_name: Target table name (can include schema, e.g., 'dbo.MyTable'). + The table must exist and the user must have INSERT permissions. + + data: Iterable of tuples or lists containing row data to be inserted. + + Data Format Requirements: + - Each element in the iterable represents one row + - Each row should be a tuple or list of column values + - Column order must match the target table's column order (by ordinal + position), unless column_mappings is specified + - The number of values in each row must match the number of columns + in the target table + + **kwargs: Additional bulk copy options. + + column_mappings (List[Tuple[int, str]], optional): + Maps source data column indices to target table column names. + Each tuple is (source_index, target_column_name) where: + - source_index: 0-based index of the column in the source data + - target_column_name: Name of the target column in the database table + + When omitted: Columns are mapped by ordinal position (first data + column → first table column, second → second, etc.) + + When specified: Only the mapped columns are inserted; unmapped + source columns are ignored, and unmapped target columns must + have default values or allow NULL. + + Returns: + Dictionary with bulk copy results including: + - rows_copied: Number of rows successfully copied + - batch_count: Number of batches processed + - elapsed_time: Time taken for the operation + + Raises: + ImportError: If mssql_py_core library is not installed + TypeError: If data is None, not iterable, or is a string/bytes + ValueError: If table_name is empty or parameters are invalid + RuntimeError: If connection string is not available + """ + try: + import mssql_py_core + except ImportError as exc: + raise ImportError( + "Bulk copy requires the mssql_py_core library which is not installed. " + "To install, run: pip install mssql_py_core " + "or install from the wheel file in the BCPRustWheel directory of the mssql-python repository: " + "pip install BCPRustWheel/mssql_py_core--.whl" + ) from exc + + # Validate inputs + if not table_name or not isinstance(table_name, str): + raise ValueError("table_name must be a non-empty string") + + # Validate that data is iterable (but not a string or bytes, which are technically iterable) + if data is None: + raise TypeError("data must be an iterable of tuples or lists, got None") + if isinstance(data, (str, bytes)): + raise TypeError( + f"data must be an iterable of tuples or lists, got {type(data).__name__}. " + "Strings and bytes are not valid row collections." + ) + if not hasattr(data, "__iter__"): + raise TypeError( + f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}" + ) + + # Extract and validate kwargs with defaults + batch_size = kwargs.get("batch_size", 0) + timeout = kwargs.get("timeout", 30) + + # Validate batch_size type and value + if not isinstance(batch_size, (int, float)): + raise TypeError( + f"batch_size must be a positive integer, got {type(batch_size).__name__}" + ) + if batch_size <= 0: + raise ValueError(f"batch_size must be positive, got {batch_size}") + + # Validate timeout type and value + if not isinstance(timeout, (int, float)): + raise TypeError(f"timeout must be a positive number, got {type(timeout).__name__}") + if timeout <= 0: + raise ValueError(f"timeout must be positive, got {timeout}") + + # Get and parse connection string + if not hasattr(self.connection, "connection_str"): + raise RuntimeError("Connection string not available for bulk copy") + + # Use the proper connection string parser that handles braced values + from mssql_python.connection_string_parser import _ConnectionStringParser + + parser = _ConnectionStringParser(validate_keywords=False) + params = parser._parse(self.connection.connection_str) + + if not params.get("server"): + raise ValueError("SERVER parameter is required in connection string") + + if not params.get("database"): + raise ValueError( + "DATABASE parameter is required in connection string for bulk copy. " + "Specify the target database explicitly to avoid accidentally writing to system databases." + ) + + # Build connection context for bulk copy library + # Note: Password is extracted separately to avoid storing it in the main context + # dict that could be accidentally logged or exposed in error messages. + trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true") + + # Parse encryption setting from connection string + encrypt_param = params.get("encrypt") + if encrypt_param is not None: + encrypt_value = encrypt_param.strip().lower() + if encrypt_value in ("yes", "true", "mandatory", "required"): + encryption = "Required" + elif encrypt_value in ("no", "false", "optional"): + encryption = "Optional" + else: + # Pass through unrecognized values (e.g., "Strict") to the underlying driver + encryption = encrypt_param + else: + encryption = "Optional" + + context = { + "server": params.get("server"), + "database": params.get("database"), + "user_name": params.get("uid", ""), + "trust_server_certificate": trust_cert, + "encryption": encryption, + } + + # Extract password separately to avoid storing it in generic context that may be logged + password = params.get("pwd", "") + pycore_context = dict(context) + pycore_context["password"] = password + + pycore_connection = None + pycore_cursor = None + try: + pycore_connection = mssql_py_core.PyCoreConnection(pycore_context) + pycore_cursor = pycore_connection.cursor() + + result = pycore_cursor.bulkcopy(table_name, iter(data), **kwargs) + + return result + + except Exception as e: + # Log the error for debugging (without exposing credentials) + logger.debug( + "Bulk copy operation failed for table '%s': %s: %s", + table_name, + type(e).__name__, + str(e), + ) + # Re-raise without exposing connection context in the error chain + # to prevent credential leakage in stack traces + raise type(e)(str(e)) from None + + finally: + # Clear sensitive data to minimize memory exposure + password = "" + if pycore_context: + pycore_context["password"] = "" + pycore_context["user_name"] = "" + # Clean up bulk copy resources + for resource in (pycore_cursor, pycore_connection): + if resource and hasattr(resource, "close"): + try: + resource.close() + except Exception as cleanup_error: + # Log cleanup errors at debug level to aid troubleshooting + # without masking the original exception + logger.debug( + "Failed to close bulk copy resource %s: %s", + type(resource).__name__, + cleanup_error, + ) + def __enter__(self): """ Enter the runtime context for the cursor. diff --git a/tests/test_004_cursor.py b/tests/test_004_cursor.py index f63972f1..1b5dc15f 100644 --- a/tests/test_004_cursor.py +++ b/tests/test_004_cursor.py @@ -14948,231 +14948,6 @@ def test_lob_binary_column_types(cursor, db_connection): db_connection.commit() -def test_zero_length_complex_types(cursor, db_connection): - """Test zero-length data for complex types (covers lines 3531-3533)""" - try: - drop_table_if_exists(cursor, "#pytest_zero_length") - cursor.execute( - """ - CREATE TABLE #pytest_zero_length ( - id INT, - empty_varchar VARCHAR(100), - empty_nvarchar NVARCHAR(100), - empty_binary VARBINARY(100) - ) - """ - ) - db_connection.commit() - - # Insert empty (non-NULL) values - cursor.execute("INSERT INTO #pytest_zero_length VALUES (?, ?, ?, ?)", (1, "", "", b"")) - db_connection.commit() - - cursor.execute( - "SELECT id, empty_varchar, empty_nvarchar, empty_binary FROM #pytest_zero_length" - ) - row = cursor.fetchone() - - assert row[0] == 1, "ID should be 1" - assert row[1] == "", "Empty VARCHAR should be empty string" - assert row[2] == "", "Empty NVARCHAR should be empty string" - assert row[3] == b"", "Empty VARBINARY should be empty bytes" - - except Exception as e: - pytest.fail(f"Zero-length complex types test failed: {e}") - finally: - drop_table_if_exists(cursor, "#pytest_zero_length") - db_connection.commit() - - -def test_guid_with_nulls(cursor, db_connection): - """Test GUID type with NULL values""" - try: - drop_table_if_exists(cursor, "#pytest_guid_nulls") - cursor.execute( - """ - CREATE TABLE #pytest_guid_nulls ( - id INT, - guid_col UNIQUEIDENTIFIER - ) - """ - ) - db_connection.commit() - - # Insert NULL GUID - cursor.execute("INSERT INTO #pytest_guid_nulls VALUES (1, NULL)") - # Insert actual GUID - cursor.execute("INSERT INTO #pytest_guid_nulls VALUES (2, NEWID())") - db_connection.commit() - - cursor.execute("SELECT id, guid_col FROM #pytest_guid_nulls ORDER BY id") - rows = cursor.fetchall() - - assert len(rows) == 2, "Should have exactly 2 rows" - assert rows[0][1] is None, "First GUID should be NULL" - assert rows[1][1] is not None, "Second GUID should not be NULL" - - except Exception as e: - pytest.fail(f"GUID with NULLs test failed: {e}") - finally: - drop_table_if_exists(cursor, "#pytest_guid_nulls") - db_connection.commit() - - -def test_datetimeoffset_with_nulls(cursor, db_connection): - """Test DATETIMEOFFSET type with NULL values""" - try: - drop_table_if_exists(cursor, "#pytest_dto_nulls") - cursor.execute( - """ - CREATE TABLE #pytest_dto_nulls ( - id INT, - dto_col DATETIMEOFFSET - ) - """ - ) - db_connection.commit() - - # Insert NULL DATETIMEOFFSET - cursor.execute("INSERT INTO #pytest_dto_nulls VALUES (1, NULL)") - # Insert actual DATETIMEOFFSET - cursor.execute("INSERT INTO #pytest_dto_nulls VALUES (2, SYSDATETIMEOFFSET())") - db_connection.commit() - - cursor.execute("SELECT id, dto_col FROM #pytest_dto_nulls ORDER BY id") - rows = cursor.fetchall() - - assert len(rows) == 2, "Should have exactly 2 rows" - assert rows[0][1] is None, "First DATETIMEOFFSET should be NULL" - assert rows[1][1] is not None, "Second DATETIMEOFFSET should not be NULL" - - except Exception as e: - pytest.fail(f"DATETIMEOFFSET with NULLs test failed: {e}") - finally: - drop_table_if_exists(cursor, "#pytest_dto_nulls") - db_connection.commit() - - -def test_decimal_conversion_edge_cases(cursor, db_connection): - """Test DECIMAL/NUMERIC type conversion including edge cases""" - try: - drop_table_if_exists(cursor, "#pytest_decimal_edge") - cursor.execute( - """ - CREATE TABLE #pytest_decimal_edge ( - id INT, - dec_col DECIMAL(18, 4) - ) - """ - ) - db_connection.commit() - - # Insert various decimal values including edge cases - test_values = [ - (1, "123.4567"), - (2, "0.0001"), - (3, "-999999999999.9999"), - (4, "999999999999.9999"), - (5, "0.0000"), - ] - - for id_val, dec_val in test_values: - cursor.execute( - "INSERT INTO #pytest_decimal_edge VALUES (?, ?)", (id_val, decimal.Decimal(dec_val)) - ) - - # Also insert NULL - cursor.execute("INSERT INTO #pytest_decimal_edge VALUES (6, NULL)") - db_connection.commit() - - cursor.execute("SELECT id, dec_col FROM #pytest_decimal_edge ORDER BY id") - rows = cursor.fetchall() - - assert len(rows) == 6, "Should have exactly 6 rows" - - # Verify the values - for i, (id_val, expected_str) in enumerate(test_values): - assert rows[i][0] == id_val, f"Row {i} ID should be {id_val}" - assert rows[i][1] == decimal.Decimal( - expected_str - ), f"Row {i} decimal should match {expected_str}" - - # Verify NULL - assert rows[5][0] == 6, "Last row ID should be 6" - assert rows[5][1] is None, "Last decimal should be NULL" - - except Exception as e: - pytest.fail(f"Decimal conversion edge cases test failed: {e}") - finally: - drop_table_if_exists(cursor, "#pytest_decimal_edge") - db_connection.commit() - - -def test_fixed_length_char_type(cursor, db_connection): - """Test SQL_CHAR (fixed-length CHAR) column processor path (Lines 3464-3467)""" - try: - cursor.execute("CREATE TABLE #pytest_char_test (id INT, char_col CHAR(10))") - cursor.execute("INSERT INTO #pytest_char_test VALUES (1, 'hello')") - cursor.execute("INSERT INTO #pytest_char_test VALUES (2, 'world')") - - cursor.execute("SELECT char_col FROM #pytest_char_test ORDER BY id") - rows = cursor.fetchall() - - # CHAR pads with spaces to fixed length - assert len(rows) == 2, "Should fetch 2 rows" - assert rows[0][0].rstrip() == "hello", "First CHAR value should be 'hello'" - assert rows[1][0].rstrip() == "world", "Second CHAR value should be 'world'" - - cursor.execute("DROP TABLE #pytest_char_test") - except Exception as e: - pytest.fail(f"Fixed-length CHAR test failed: {e}") - - -def test_fixed_length_nchar_type(cursor, db_connection): - """Test SQL_WCHAR (fixed-length NCHAR) column processor path (Lines 3469-3472)""" - try: - cursor.execute("CREATE TABLE #pytest_nchar_test (id INT, nchar_col NCHAR(10))") - cursor.execute("INSERT INTO #pytest_nchar_test VALUES (1, N'hello')") - cursor.execute("INSERT INTO #pytest_nchar_test VALUES (2, N'世界')") # Unicode test - - cursor.execute("SELECT nchar_col FROM #pytest_nchar_test ORDER BY id") - rows = cursor.fetchall() - - # NCHAR pads with spaces to fixed length - assert len(rows) == 2, "Should fetch 2 rows" - assert rows[0][0].rstrip() == "hello", "First NCHAR value should be 'hello'" - assert rows[1][0].rstrip() == "世界", "Second NCHAR value should be '世界'" - - cursor.execute("DROP TABLE #pytest_nchar_test") - except Exception as e: - pytest.fail(f"Fixed-length NCHAR test failed: {e}") - - -def test_fixed_length_binary_type(cursor, db_connection): - """Test SQL_BINARY (fixed-length BINARY) column processor path (Lines 3474-3477)""" - try: - cursor.execute("CREATE TABLE #pytest_binary_test (id INT, binary_col BINARY(8))") - cursor.execute("INSERT INTO #pytest_binary_test VALUES (1, 0x0102030405)") - cursor.execute("INSERT INTO #pytest_binary_test VALUES (2, 0xAABBCCDD)") - - cursor.execute("SELECT binary_col FROM #pytest_binary_test ORDER BY id") - rows = cursor.fetchall() - - # BINARY pads with zeros to fixed length (8 bytes) - assert len(rows) == 2, "Should fetch 2 rows" - assert len(rows[0][0]) == 8, "BINARY(8) should be 8 bytes" - assert len(rows[1][0]) == 8, "BINARY(8) should be 8 bytes" - # First 5 bytes should match, rest padded with zeros - assert ( - rows[0][0][:5] == b"\x01\x02\x03\x04\x05" - ), "First BINARY value should start with inserted bytes" - assert rows[0][0][5:] == b"\x00\x00\x00", "BINARY should be zero-padded" - - cursor.execute("DROP TABLE #pytest_binary_test") - except Exception as e: - pytest.fail(f"Fixed-length BINARY test failed: {e}") - - def test_fetchall_with_integrity_constraint(cursor, db_connection): """ Test that UNIQUE constraint errors are appropriately triggered for multi-row INSERT