Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
print(f"Database ID: {row[0]}, Name: {row[1]}")

cursor.close()
conn.close()
conn.close()
187 changes: 186 additions & 1 deletion mssql_python/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import uuid
import datetime
import warnings
from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING
from typing import List, Union, Any, Optional, Tuple, Sequence, TYPE_CHECKING, Iterable
from mssql_python.constants import ConstantsDDBC as ddbc_sql_const, SQLTypes
from mssql_python.helpers import check_error
from mssql_python.logging import logger
Expand Down Expand Up @@ -2451,6 +2451,191 @@ def nextset(self) -> Union[bool, None]:
)
return True

def _bulkcopy(
self, table_name: str, data: Iterable[Union[Tuple, List]], **kwargs
): # pragma: no cover
"""
Perform bulk copy operation for high-performance data loading.

Args:
table_name: Target table name (can include schema, e.g., 'dbo.MyTable').
The table must exist and the user must have INSERT permissions.

data: Iterable of tuples or lists containing row data to be inserted.

Data Format Requirements:
- Each element in the iterable represents one row
- Each row should be a tuple or list of column values
- Column order must match the target table's column order (by ordinal
position), unless column_mappings is specified
- The number of values in each row must match the number of columns
in the target table

**kwargs: Additional bulk copy options.

column_mappings (List[Tuple[int, str]], optional):
Maps source data column indices to target table column names.
Each tuple is (source_index, target_column_name) where:
- source_index: 0-based index of the column in the source data
- target_column_name: Name of the target column in the database table

When omitted: Columns are mapped by ordinal position (first data
column → first table column, second → second, etc.)

When specified: Only the mapped columns are inserted; unmapped
source columns are ignored, and unmapped target columns must
have default values or allow NULL.

Returns:
Dictionary with bulk copy results including:
- rows_copied: Number of rows successfully copied
- batch_count: Number of batches processed
- elapsed_time: Time taken for the operation

Raises:
ImportError: If mssql_py_core library is not installed
TypeError: If data is None, not iterable, or is a string/bytes
ValueError: If table_name is empty or parameters are invalid
RuntimeError: If connection string is not available
"""
try:
import mssql_py_core
except ImportError as exc:
raise ImportError(
"Bulk copy requires the mssql_py_core library which is not installed. "
"To install, run: pip install mssql_py_core "
"or install from the wheel file in the BCPRustWheel directory of the mssql-python repository: "
"pip install BCPRustWheel/mssql_py_core-<version>-<platform>.whl"
) from exc

# Validate inputs
if not table_name or not isinstance(table_name, str):
raise ValueError("table_name must be a non-empty string")

# Validate that data is iterable (but not a string or bytes, which are technically iterable)
if data is None:
raise TypeError("data must be an iterable of tuples or lists, got None")
if isinstance(data, (str, bytes)):
raise TypeError(
f"data must be an iterable of tuples or lists, got {type(data).__name__}. "
"Strings and bytes are not valid row collections."
)
if not hasattr(data, "__iter__"):
raise TypeError(
f"data must be an iterable of tuples or lists, got non-iterable {type(data).__name__}"
)

# Extract and validate kwargs with defaults
batch_size = kwargs.get("batch_size", 0)
timeout = kwargs.get("timeout", 30)

# Validate batch_size type and value
if not isinstance(batch_size, (int, float)):
raise TypeError(
f"batch_size must be a positive integer, got {type(batch_size).__name__}"
)
if batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {batch_size}")

# Validate timeout type and value
if not isinstance(timeout, (int, float)):
raise TypeError(f"timeout must be a positive number, got {type(timeout).__name__}")
if timeout <= 0:
raise ValueError(f"timeout must be positive, got {timeout}")

# Get and parse connection string
if not hasattr(self.connection, "connection_str"):
raise RuntimeError("Connection string not available for bulk copy")

# Use the proper connection string parser that handles braced values
from mssql_python.connection_string_parser import _ConnectionStringParser

parser = _ConnectionStringParser(validate_keywords=False)
params = parser._parse(self.connection.connection_str)

if not params.get("server"):
raise ValueError("SERVER parameter is required in connection string")

if not params.get("database"):
raise ValueError(
"DATABASE parameter is required in connection string for bulk copy. "
"Specify the target database explicitly to avoid accidentally writing to system databases."
)

# Build connection context for bulk copy library
# Note: Password is extracted separately to avoid storing it in the main context
# dict that could be accidentally logged or exposed in error messages.
trust_cert = params.get("trustservercertificate", "yes").lower() in ("yes", "true")

# Parse encryption setting from connection string
encrypt_param = params.get("encrypt")
if encrypt_param is not None:
encrypt_value = encrypt_param.strip().lower()
if encrypt_value in ("yes", "true", "mandatory", "required"):
encryption = "Required"
elif encrypt_value in ("no", "false", "optional"):
encryption = "Optional"
else:
# Pass through unrecognized values (e.g., "Strict") to the underlying driver
encryption = encrypt_param
else:
encryption = "Optional"

context = {
"server": params.get("server"),
"database": params.get("database"),
"user_name": params.get("uid", ""),
"trust_server_certificate": trust_cert,
"encryption": encryption,
}

# Extract password separately to avoid storing it in generic context that may be logged
password = params.get("pwd", "")
pycore_context = dict(context)
pycore_context["password"] = password

pycore_connection = None
pycore_cursor = None
try:
pycore_connection = mssql_py_core.PyCoreConnection(pycore_context)
pycore_cursor = pycore_connection.cursor()

result = pycore_cursor.bulkcopy(table_name, iter(data), **kwargs)

return result

except Exception as e:
# Log the error for debugging (without exposing credentials)
logger.debug(
"Bulk copy operation failed for table '%s': %s: %s",
table_name,
type(e).__name__,
str(e),
)
# Re-raise without exposing connection context in the error chain
# to prevent credential leakage in stack traces
raise type(e)(str(e)) from None

finally:
# Clear sensitive data to minimize memory exposure
password = ""
if pycore_context:
pycore_context["password"] = ""
pycore_context["user_name"] = ""
# Clean up bulk copy resources
for resource in (pycore_cursor, pycore_connection):
if resource and hasattr(resource, "close"):
try:
resource.close()
except Exception as cleanup_error:
# Log cleanup errors at debug level to aid troubleshooting
# without masking the original exception
logger.debug(
"Failed to close bulk copy resource %s: %s",
type(resource).__name__,
cleanup_error,
)

def __enter__(self):
"""
Enter the runtime context for the cursor.
Expand Down
Loading
Loading