diff --git a/.github/workflows/pre-commit-format.yml b/.github/workflows/pre-commit-format.yml index 786a5e5..6228e1c 100644 --- a/.github/workflows/pre-commit-format.yml +++ b/.github/workflows/pre-commit-format.yml @@ -16,7 +16,7 @@ concurrency: jobs: formatting-checks: - runs-on: ubuntu-20.04 + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v4 diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs index cb4d8f6..2f91c4c 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs @@ -100,7 +100,6 @@ public async Task TestInsertAlignedStringRecord() var res_cnt = 0; while (res.HasNext()) { - res.Next(); res_cnt++; } Console.WriteLine(res_cnt + " " + fetchSize * processedSize); @@ -185,8 +184,10 @@ public async Task TestInsertAlignedRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); + Console.WriteLine(rowRecords); + + System.Diagnostics.Debug.Assert(true); await res.Close(); Console.WriteLine(status); @@ -216,8 +217,8 @@ public async Task TestInsertAlignedRecords() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; + Console.WriteLine(res.Next()); } await res.Close(); @@ -265,8 +266,7 @@ public async Task TestInsertAlignedStringRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); @@ -299,7 +299,7 @@ public async Task TestInsertAlignedStringRecords() var res_count = 0; while (res.HasNext()) { - res.Next(); + Console.WriteLine(res.Next()); res_count += 1; } @@ -386,8 +386,7 @@ public async Task TestInsertAlignedRecordsOfOneDevice() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); rowRecords = new List() { }; @@ -409,7 +408,6 @@ public async Task TestInsertAlignedRecordsOfOneDevice() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -454,8 +452,7 @@ public async Task TestInsertAlignedStringRecordsOfOneDevice() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -483,7 +480,6 @@ public async Task TestInsertAlignedStringRecordsOfOneDevice() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs index 77050ad..ec1dcd2 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedTablet.cs @@ -54,8 +54,7 @@ public async Task TestInsertAlignedTablet() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -86,7 +85,6 @@ public async Task TestInsertAlignedTablet() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -149,8 +147,7 @@ public async Task TestInsertAlignedTablets() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); // large data test var tasks = new List>(); @@ -179,7 +176,6 @@ public async Task TestInsertAlignedTablets() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs index 4cf68f4..38431a0 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Record.cs @@ -113,7 +113,6 @@ public async Task TestInsertStringRecord() var res_cnt = 0; while (res.HasNext()) { - res.Next(); res_cnt++; } Console.WriteLine(res_cnt + " " + fetchSize * processedSize); @@ -149,8 +148,7 @@ public async Task TestInsertStrRecord() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<2"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); @@ -173,7 +171,6 @@ public async Task TestInsertStrRecord() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -256,8 +253,7 @@ public async Task TestInsertRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine(status); @@ -282,12 +278,13 @@ public async Task TestInsertRecords() Task.WaitAll(tasks.ToArray()); res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); - res.ShowTableNames(); + var record_count = fetchSize * processedSize; + + res.ShowTableNames(); var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -306,10 +303,10 @@ public async Task TestInsertRecords() break; } - Console.WriteLine($"{testDatabaseName}.{testDevice}.{row.Measurements[0]} {testMeasurements[3]}"); - System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[3]}" == row.Measurements[0]); - System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[1]}" == row.Measurements[1]); - System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[2]}" == row.Measurements[2]); + Console.WriteLine($"{testDatabaseName}.{testDevice}.{row.Measurements[1]} {testMeasurements[3]}"); + System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[3]}" == row.Measurements[1]); + System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[1]}" == row.Measurements[2]); + System.Diagnostics.Debug.Assert($"{testDatabaseName}.{testDevice}.{testMeasurements[2]}" == row.Measurements[3]); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); System.Diagnostics.Debug.Assert(status == 0); @@ -351,8 +348,7 @@ public async Task TestInsertStringRecords() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); @@ -386,7 +382,7 @@ public async Task TestInsertStringRecords() var res_count = 0; while (res.HasNext()) { - res.Next(); + Console.WriteLine(res.Next()); res_count += 1; } @@ -468,8 +464,7 @@ public async Task TestInsertRecordsOfOneDevice() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -492,7 +487,6 @@ public async Task TestInsertRecordsOfOneDevice() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -543,8 +537,7 @@ public async Task TestInsertStringRecordsOfOneDevice() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -572,7 +565,6 @@ public async Task TestInsertStringRecordsOfOneDevice() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs index bf2ae17..657d5aa 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.Tablet.cs @@ -55,8 +55,7 @@ public async Task TestInsertTablet() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -86,7 +85,6 @@ public async Task TestInsertTablet() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -149,8 +147,7 @@ public async Task TestInsertTablets() // System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); // large data test @@ -180,7 +177,6 @@ public async Task TestInsertTablets() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs index 9f2a61c..1506c96 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.TestNetwork.cs @@ -144,8 +144,7 @@ public async Task TestTestInsertRecords() // System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); @@ -174,7 +173,6 @@ public async Task TestTestInsertRecords() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -214,8 +212,7 @@ public async Task TestTestInsertTablet() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<15"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -245,7 +242,6 @@ public async Task TestTestInsertTablet() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } @@ -307,8 +303,7 @@ public async Task TestTestInsertTablets() // System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevices[1]) + " where time<15"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); // large data test @@ -338,7 +333,6 @@ public async Task TestTestInsertTablets() var res_count = 0; while (res.HasNext()) { - res.Next(); res_count += 1; } diff --git a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs index 8b27c97..495008b 100644 --- a/samples/Apache.IoTDB.Samples/SessionPoolTest.cs +++ b/samples/Apache.IoTDB.Samples/SessionPoolTest.cs @@ -224,7 +224,7 @@ public async Task TestGetTimeZone() await session_pool.DeleteDatabaseAsync(testDatabaseName); System.Diagnostics.Debug.Assert(session_pool.IsOpen()); var time_zone = await session_pool.GetTimeZone(); - System.Diagnostics.Debug.Assert(time_zone == "UTC+08:00"); + System.Diagnostics.Debug.Assert(time_zone == "Asia/Shanghai"); await session_pool.Close(); Console.WriteLine("TestGetTimeZone Passed!"); } @@ -324,8 +324,7 @@ public async Task TestDeleteData() System.Diagnostics.Debug.Assert(status == 0); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); var ts_path_lst = new List() @@ -336,8 +335,7 @@ public async Task TestDeleteData() await session_pool.DeleteDataAsync(ts_path_lst, 2, 3); res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -374,8 +372,8 @@ await session_pool.ExecuteNonQueryStatementAsync( "insert into " + string.Format("{0}.{1}", testDatabaseName, testDevice) + "(timestamp, status, hardware) VALUES (7, true,'lz')"); var res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + + UtilsTest.PrintDataSetByString(res); await res.Close(); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -452,33 +450,28 @@ await session_pool.ExecuteNonQueryStatementAsync( "insert into " + string.Format("{0}.{1}", testDatabaseName, testDevice) + "(timestamp, status, hardware) VALUES (7, true,'lz')"); var res = await session_pool.ExecuteQueryStatementAsync("show timeseries root"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("SHOW TIMESERIES ROOT sql passed!"); res = await session_pool.ExecuteQueryStatementAsync("show devices"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("SHOW DEVICES sql passed!"); res = await session_pool.ExecuteQueryStatementAsync($"COUNT TIMESERIES {testDatabaseName}"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("COUNT TIMESERIES root sql Passed"); res = await session_pool.ExecuteQueryStatementAsync("select * from root.ln.wf01 where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); Console.WriteLine("SELECT sql Passed"); res = await session_pool.ExecuteQueryStatementAsync( "select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice) + " where time<10"); - res.ShowTableNames(); - while (res.HasNext()) Console.WriteLine(res.Next()); + UtilsTest.PrintDataSetByString(res); await res.Close(); status = await session_pool.DeleteDatabaseAsync(testDatabaseName); @@ -521,7 +514,6 @@ public async Task TestRawDataQuery() var count = 0; while (res.HasNext()) { - var record = res.Next(); count++; } Console.WriteLine(count + " " + (fetchSize * processedSize - 10)); @@ -568,8 +560,7 @@ public async Task TestLastDataQuery() var count = 0; while (res.HasNext()) { - var record = res.Next(); - Console.WriteLine(record); + Console.WriteLine(count); count++; } Console.WriteLine(count + " " + (fetchSize * processedSize - 10)); @@ -621,13 +612,17 @@ public async Task TestMultiNodeDataFetch() // fetch data var paths = new List() { string.Format("{0}.{1}", device_id, testMeasurements[0]), string.Format("{0}.{1}", device_id, testMeasurements[1]) }; var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", testDatabaseName, testDevice)); - res.ShowTableNames(); - var count = 0; - while (res.HasNext()) + + IReadOnlyList columns = res.GetColumnNames(); + foreach (string columnName in columns) { - var record = res.Next(); - count++; + Console.Write($"{columnName}\t"); } + Console.WriteLine(); + + var count = 0; + while (res.HasNext()) count++; + Console.WriteLine(count + " " + (fetchSize * processedSize * 4 + 783)); System.Diagnostics.Debug.Assert(count == fetchSize * processedSize * 4 + 783); await res.Close(); diff --git a/samples/Apache.IoTDB.Samples/UtilsTest.cs b/samples/Apache.IoTDB.Samples/UtilsTest.cs index 0670cd6..af3a46f 100644 --- a/samples/Apache.IoTDB.Samples/UtilsTest.cs +++ b/samples/Apache.IoTDB.Samples/UtilsTest.cs @@ -18,7 +18,9 @@ */ using System; +using System.Collections.Generic; using System.Diagnostics; +using Apache.IoTDB.DataStructure; namespace Apache.IoTDB.Samples { @@ -77,5 +79,106 @@ private void TestInvalidInputs() } Console.WriteLine("TestInvalidInputs passed."); } + + static public void PrintDataSetByType(SessionDataSet sessionDataSet) + { + IReadOnlyList columns = sessionDataSet.GetColumnNames(); + + foreach (string columnName in columns) + { + Console.Write($"{columnName}\t"); + } + Console.WriteLine(); + + while (sessionDataSet.HasNext()) + { + for (int i = 0; i < columns.Count; i++) + { + string columnName = columns[i]; + string typeStr = sessionDataSet.GetColumnTypes()[i]; + TSDataType dataType = Client.GetDataTypeByStr(typeStr); + + switch (dataType) + { + case TSDataType.BOOLEAN: + bool boolValue = sessionDataSet.GetBoolean(columnName); + Console.Write(boolValue); + break; + case TSDataType.INT32: + int intValue = sessionDataSet.GetInt(columnName); + Console.Write(intValue); + break; + case TSDataType.INT64: + case TSDataType.TIMESTAMP: + long longValue = sessionDataSet.GetLong(columnName); + Console.Write(longValue); + break; + case TSDataType.FLOAT: + float floatValue = sessionDataSet.GetFloat(columnName); + Console.Write(floatValue); + break; + case TSDataType.DOUBLE: + double doubleValue = sessionDataSet.GetDouble(columnName); + Console.Write(doubleValue); + break; + case TSDataType.TEXT: + case TSDataType.STRING: + case TSDataType.BLOB: + case TSDataType.DATE: + string stringValue = sessionDataSet.GetString(columnName); + Console.Write(stringValue); + break; + default: + break; + } + Console.Write("\t\t"); + } + Console.WriteLine(); + } + } + + static public void PrintDataSetByObject(SessionDataSet sessionDataSet) + { + IReadOnlyList columns = sessionDataSet.GetColumnNames(); + + foreach (string columnName in columns) + { + Console.Write($"{columnName}\t"); + } + Console.WriteLine(); + + while (sessionDataSet.HasNext()) + { + for (int i = 0; i < columns.Count; i++) + { + string columnName = columns[i]; + Console.Write(sessionDataSet.GetObject(columnName)); + Console.Write("\t\t"); + } + Console.WriteLine(); + } + } + + static public void PrintDataSetByString(SessionDataSet sessionDataSet) + { + IReadOnlyList columns = sessionDataSet.GetColumnNames(); + + foreach (string columnName in columns) + { + Console.Write($"{columnName}\t"); + } + Console.WriteLine(); + + while (sessionDataSet.HasNext()) + { + for (int i = 0; i < columns.Count; i++) + { + string columnName = columns[i]; + Console.Write(sessionDataSet.GetString(columnName)); + Console.Write("\t\t"); + } + Console.WriteLine(); + } + } } } diff --git a/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs b/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs index c2cac87..74280c9 100644 --- a/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs +++ b/src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs @@ -66,7 +66,7 @@ private enum Keywords private string _password = "root"; private bool _enableRpcCompression = false; private int _fetchSize = 1800; - private string _zoneId = "UTC+08:00"; + private string _zoneId = "Asia/Shanghai"; private int _port = 6667; private int _poolSize = 8; private int _timeOut = 10000; @@ -413,7 +413,7 @@ private void Reset(Keywords index) _poolSize = 8; return; case Keywords.ZoneId: - _zoneId = "UTC+08:00"; + _zoneId = "Asia/Shanghai"; return; case Keywords.TimeOut: _timeOut = 10000;//10sec. diff --git a/src/Apache.IoTDB.Data/IoTDBDataReader.cs b/src/Apache.IoTDB.Data/IoTDBDataReader.cs index 156844e..8f96841 100644 --- a/src/Apache.IoTDB.Data/IoTDBDataReader.cs +++ b/src/Apache.IoTDB.Data/IoTDBDataReader.cs @@ -41,7 +41,7 @@ public class IoTDBDataReader : DbDataReader private bool _hasRows; private readonly int _recordsAffected; private bool _closed; - private readonly List _metas; + private IReadOnlyList _metas; private bool _closeConnection; private int _fieldCount; @@ -55,11 +55,12 @@ internal IoTDBDataReader(IoTDBCommand IoTDBCommand, SessionDataSet dataSet, bool _IoTDB = IoTDBCommand.Connection._IoTDB; _command = IoTDBCommand; _closeConnection = closeConnection; - _fieldCount = dataSet.ColumnNames.Count; - _hasRows = dataSet.RowCount > 0; - _recordsAffected = dataSet.RowCount; + _fieldCount = dataSet.GetColumnNames().Count; + _hasRows = dataSet.RowCount() > 0; + _recordsAffected = dataSet.RowCount(); + _closed = _closeConnection; - _metas = dataSet.ColumnNames; + _metas = dataSet.GetColumnNames(); _dataSet = dataSet; } @@ -456,7 +457,7 @@ public override DataTable GetSchemaTable() { if (_dataSet.HasNext()) { - rowdata = _dataSet.GetRow(); + rowdata = _dataSet.Next(); } var schemaTable = new DataTable("SchemaTable"); if (_metas != null && rowdata != null) diff --git a/src/Apache.IoTDB/Client.cs b/src/Apache.IoTDB/Client.cs index 03238d2..943cfa1 100644 --- a/src/Apache.IoTDB/Client.cs +++ b/src/Apache.IoTDB/Client.cs @@ -37,5 +37,23 @@ public Client(IClientRPCService.Client client, long sessionId, long statementId, Transport = transport; EndPoint = endpoint; } + + static public TSDataType GetDataTypeByStr(string typeStr) + { + return typeStr switch + { + "BOOLEAN" => TSDataType.BOOLEAN, + "INT32" => TSDataType.INT32, + "INT64" => TSDataType.INT64, + "FLOAT" => TSDataType.FLOAT, + "DOUBLE" => TSDataType.DOUBLE, + "TEXT" => TSDataType.TEXT, + "STRING" => TSDataType.STRING, + "BLOB" => TSDataType.BLOB, + "TIMESTAMP" => TSDataType.TIMESTAMP, + "DATE" => TSDataType.DATE, + _ => TSDataType.NONE + }; + } } } diff --git a/src/Apache.IoTDB/DataStructure/ByteBuffer.cs b/src/Apache.IoTDB/DataStructure/ByteBuffer.cs index a3e1823..7fade31 100644 --- a/src/Apache.IoTDB/DataStructure/ByteBuffer.cs +++ b/src/Apache.IoTDB/DataStructure/ByteBuffer.cs @@ -146,6 +146,16 @@ public byte[] GetBuffer() return _buffer[.._writePos]; } + public byte[] GetBytesByLengthh(int length) + { + if (_readPos + length > _buffer.Length) + throw new ArgumentOutOfRangeException(nameof(length), + $"Requested length ({length}) with current read position ({_readPos}) exceeds buffer size ({_buffer.Length})."); + var strBuff = _buffer[_readPos..(_readPos + length)]; + _readPos += length; + return strBuff; + } + private void ExtendBuffer(int spaceNeed) { if (_writePos + spaceNeed >= _totalLength) diff --git a/src/Apache.IoTDB/DataStructure/Column.cs b/src/Apache.IoTDB/DataStructure/Column.cs new file mode 100644 index 0000000..52d816f --- /dev/null +++ b/src/Apache.IoTDB/DataStructure/Column.cs @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Linq; + +namespace Apache.IoTDB.DataStructure +{ + public enum ColumnEncoding : byte + { + ByteArray, + Int32Array, + Int64Array, + BinaryArray, + Rle + } + + public class Binary + { + public byte[] Data { get; } + + public Binary(byte[] data) + { + Data = data; + } + } + + public interface Column + { + TSDataType GetDataType(); + ColumnEncoding GetEncoding(); + bool GetBoolean(int position); + int GetInt(int position); + long GetLong(int position); + float GetFloat(int position); + double GetDouble(int position); + Binary GetBinary(int position); + object GetObject(int position); + + bool[] GetBooleans(); + int[] GetInts(); + long[] GetLongs(); + float[] GetFloats(); + double[] GetDoubles(); + Binary[] GetBinaries(); + object[] GetObjects(); + + bool MayHaveNull(); + bool IsNull(int position); + bool[] GetNulls(); + + int GetPositionCount(); + } + + public abstract class BaseColumn : Column + { + public virtual TSDataType GetDataType() => throw new NotSupportedException(); + public virtual ColumnEncoding GetEncoding() => throw new NotSupportedException(); + public virtual bool GetBoolean(int position) => throw new NotSupportedException("GetBoolean not supported"); + public virtual int GetInt(int position) => throw new NotSupportedException("GetInt not supported"); + public virtual long GetLong(int position) => throw new NotSupportedException("GetLong not supported"); + public virtual float GetFloat(int position) => throw new NotSupportedException("GetFloat not supported"); + public virtual double GetDouble(int position) => throw new NotSupportedException("GetDouble not supported"); + public virtual Binary GetBinary(int position) => throw new NotSupportedException("GetBinary not supported"); + public virtual object GetObject(int position) => throw new NotSupportedException("GetObject not supported"); + + public virtual bool[] GetBooleans() => throw new NotSupportedException("GetBooleans not supported"); + public virtual int[] GetInts() => throw new NotSupportedException("GetInts not supported"); + public virtual long[] GetLongs() => throw new NotSupportedException("GetLongs not supported"); + public virtual float[] GetFloats() => throw new NotSupportedException("GetFloats not supported"); + public virtual double[] GetDoubles() => throw new NotSupportedException("GetDoubles not supported"); + public virtual Binary[] GetBinaries() => throw new NotSupportedException("GetBinaries not supported"); + public virtual object[] GetObjects() => throw new NotSupportedException("GetObjects not supported"); + + public virtual bool MayHaveNull() => false; + public virtual bool IsNull(int position) => false; + public virtual bool[] GetNulls() => new bool[GetPositionCount()]; + public abstract int GetPositionCount(); + } + + public abstract class PrimitiveColumn : BaseColumn + { + protected readonly T[] _values; + protected readonly int _arrayOffset; + protected readonly int _positionCount; + protected readonly bool[] _valueIsNull; + + private readonly TSDataType _dataType; + private readonly ColumnEncoding _encoding; + + protected PrimitiveColumn( + TSDataType dataType, + ColumnEncoding encoding, + int arrayOffset, + int positionCount, + bool[] valueIsNull, + T[] values) + { + if (arrayOffset < 0) + throw new ArgumentException("arrayOffset is negative"); + if (positionCount < 0) + throw new ArgumentException("positionCount is negative"); + if (values == null || values.Length - arrayOffset < positionCount) + throw new ArgumentException("values array is too short"); + if (valueIsNull != null && valueIsNull.Length - arrayOffset < positionCount) + throw new ArgumentException("isNull array is too short"); + + _dataType = dataType; + _encoding = encoding; + _arrayOffset = arrayOffset; + _positionCount = positionCount; + _valueIsNull = valueIsNull; + _values = values; + } + + public override TSDataType GetDataType() => _dataType; + public override ColumnEncoding GetEncoding() => _encoding; + + public override bool MayHaveNull() => _valueIsNull != null; + public override bool IsNull(int position) => _valueIsNull?[position + _arrayOffset] ?? false; + public override bool[] GetNulls() + { + if (_valueIsNull == null) + return new bool[_positionCount]; + + return _valueIsNull.Skip(_arrayOffset).Take(_positionCount).ToArray(); + } + + public override int GetPositionCount() => _positionCount; + } + + public class TimeColumn : PrimitiveColumn + { + public TimeColumn(int arrayOffset, int positionCount, long[] values) + : base( + dataType: TSDataType.INT64, + encoding: ColumnEncoding.Int64Array, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: null, + values: values) + { } + + public override long GetLong(int position) => _values[position + _arrayOffset]; + public override long[] GetLongs() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetLong(position); + public override object[] GetObjects() => GetLongs().Cast().ToArray(); + + public long GetStartTime() => _values[_arrayOffset]; + public long GetEndTime() => _values[_arrayOffset + _positionCount - 1]; + public long[] GetTimes() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + } + + public class BinaryColumn : PrimitiveColumn + { + public BinaryColumn(int arrayOffset, int positionCount, bool[] valueIsNull, Binary[] values) + : base( + TSDataType.TEXT, + ColumnEncoding.BinaryArray, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values) + { } + + public override Binary GetBinary(int position) => _values[position + _arrayOffset]; + public override Binary[] GetBinaries() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetBinary(position); + public override object[] GetObjects() => GetBinaries().Cast().ToArray(); + } + + public class IntColumn : PrimitiveColumn + { + public IntColumn(int arrayOffset, int positionCount, bool[] valueIsNull, int[] values) + : base( + TSDataType.INT32, + ColumnEncoding.Int32Array, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values) + { } + + public override int GetInt(int position) => _values[position + _arrayOffset]; + public override int[] GetInts() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetInt(position); + public override object[] GetObjects() => GetInts().Cast().ToArray(); + } + + public class FloatColumn : PrimitiveColumn + { + public FloatColumn(int arrayOffset, int positionCount, bool[] valueIsNull, float[] values) + : base( + TSDataType.FLOAT, + ColumnEncoding.Int32Array, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values) + { } + + public override float GetFloat(int position) => _values[position + _arrayOffset]; + public override float[] GetFloats() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetFloat(position); + public override object[] GetObjects() => GetFloats().Cast().ToArray(); + } + + public class LongColumn : PrimitiveColumn + { + public LongColumn(int arrayOffset, int positionCount, bool[] valueIsNull, long[] values) + : base( + TSDataType.INT64, + ColumnEncoding.Int64Array, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values) + { } + + public override long GetLong(int position) => _values[position + _arrayOffset]; + public override long[] GetLongs() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetLong(position); + public override object[] GetObjects() => GetLongs().Cast().ToArray(); + } + + public class DoubleColumn : PrimitiveColumn + { + public DoubleColumn(int arrayOffset, int positionCount, bool[] valueIsNull, double[] values) + : base( + TSDataType.DOUBLE, + ColumnEncoding.Int64Array, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values) + { } + + public override double GetDouble(int position) => _values[position + _arrayOffset]; + public override double[] GetDoubles() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetDouble(position); + public override object[] GetObjects() => GetDoubles().Cast().ToArray(); + } + + public class BooleanColumn : PrimitiveColumn + { + public BooleanColumn(int arrayOffset, int positionCount, bool[] valueIsNull, bool[] values) + : base( + TSDataType.BOOLEAN, + ColumnEncoding.ByteArray, + arrayOffset: arrayOffset, + positionCount: positionCount, + valueIsNull: valueIsNull, + values: values) + { } + + public override bool GetBoolean(int position) => _values[position + _arrayOffset]; + public override bool[] GetBooleans() => _values.Skip(_arrayOffset).Take(_positionCount).ToArray(); + public override object GetObject(int position) => GetBoolean(position); + public override object[] GetObjects() => GetBooleans().Cast().ToArray(); + } + + public class RunLengthEncodedColumn : BaseColumn + { + private readonly Column _value; + private readonly int _positionCount; + + public RunLengthEncodedColumn(Column value, int positionCount) + { + if (value == null) + throw new ArgumentNullException(nameof(value)); + if (value.GetPositionCount() != 1) + throw new ArgumentException("Expected value to contain a single position"); + if (positionCount < 0) + throw new ArgumentException("positionCount is negative"); + + // Unwrap nested RLE columns + _value = value is RunLengthEncodedColumn rle ? rle.Value : value; + _positionCount = positionCount; + } + + public Column Value => _value; + + public override TSDataType GetDataType() => _value.GetDataType(); + public override ColumnEncoding GetEncoding() => ColumnEncoding.Rle; + + public override bool GetBoolean(int position) => _value.GetBoolean(0); + public override int GetInt(int position) => _value.GetInt(0); + public override long GetLong(int position) => _value.GetLong(0); + public override float GetFloat(int position) => _value.GetFloat(0); + public override double GetDouble(int position) => _value.GetDouble(0); + public override Binary GetBinary(int position) => _value.GetBinary(0); + public override object GetObject(int position) => _value.GetObject(0); + + public override bool[] GetBooleans() => Enumerable.Repeat(_value.GetBoolean(0), _positionCount).ToArray(); + public override int[] GetInts() => Enumerable.Repeat(_value.GetInt(0), _positionCount).ToArray(); + public override long[] GetLongs() => Enumerable.Repeat(_value.GetLong(0), _positionCount).ToArray(); + public override float[] GetFloats() => Enumerable.Repeat(_value.GetFloat(0), _positionCount).ToArray(); + public override double[] GetDoubles() => Enumerable.Repeat(_value.GetDouble(0), _positionCount).ToArray(); + public override Binary[] GetBinaries() => Enumerable.Repeat(_value.GetBinary(0), _positionCount).ToArray(); + public override object[] GetObjects() => Enumerable.Repeat(_value.GetObject(0), _positionCount).ToArray(); + + public override bool MayHaveNull() => _value.MayHaveNull(); + public override bool IsNull(int position) => _value.IsNull(0); + public override bool[] GetNulls() => Enumerable.Repeat(_value.IsNull(0), _positionCount).ToArray(); + + public override int GetPositionCount() => _positionCount; + } +} diff --git a/src/Apache.IoTDB/DataStructure/ColumnDecoder.cs b/src/Apache.IoTDB/DataStructure/ColumnDecoder.cs new file mode 100644 index 0000000..a903498 --- /dev/null +++ b/src/Apache.IoTDB/DataStructure/ColumnDecoder.cs @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; + +namespace Apache.IoTDB.DataStructure +{ + public interface ColumnDecoder + { + Column ReadColumn(ByteBuffer reader, TSDataType dataType, int positionCount); + } + + public class BaseColumnDecoder + { + + private static Dictionary decoders = new Dictionary + { + { ColumnEncoding.Int32Array, new Int32ArrayColumnDecoder() }, + { ColumnEncoding.Int64Array, new Int64ArrayColumnDecoder() }, + { ColumnEncoding.ByteArray, new ByteArrayColumnDecoder() }, + { ColumnEncoding.BinaryArray, new BinaryArrayColumnDecoder() }, + { ColumnEncoding.Rle, new RunLengthColumnDecoder() } + }; + + public static ColumnDecoder GetDecoder(ColumnEncoding encoding) + { + if (decoders.TryGetValue(encoding, out var decoder)) + return decoder; + throw new ArgumentException($"Unsupported encoding: {encoding}"); + } + + public static ColumnEncoding DeserializeColumnEncoding(ByteBuffer reader) + { + return (ColumnEncoding)reader.GetByte(); + } + } + + public static class ColumnDeserializer + { + public static bool[] DeserializeNullIndicators(ByteBuffer reader, int positionCount) + { + byte b = reader.GetByte(); + bool mayHaveNull = b != 0; + if (!mayHaveNull) + return null; + return DeserializeBooleanArray(reader, positionCount); + } + + public static bool[] DeserializeBooleanArray(ByteBuffer reader, int size) + { + int packedSize = (size + 7) / 8; + byte[] packedBytes = reader.GetBytesByLengthh(packedSize); + if (packedBytes.Length < packedSize) + throw new InvalidDataException( + $"Boolean array decoding failed: expected {packedSize} bytes for {size} bits, but only received {packedBytes.Length} bytes from buffer." + ); + bool[] output = new bool[size]; + int currentByte = 0; + int fullGroups = size & ~0b111; + + for (int pos = 0; pos < fullGroups; pos += 8) + { + byte b = packedBytes[currentByte++]; + output[pos + 0] = (b & 0b10000000) != 0; + output[pos + 1] = (b & 0b01000000) != 0; + output[pos + 2] = (b & 0b00100000) != 0; + output[pos + 3] = (b & 0b00010000) != 0; + output[pos + 4] = (b & 0b00001000) != 0; + output[pos + 5] = (b & 0b00000100) != 0; + output[pos + 6] = (b & 0b00000010) != 0; + output[pos + 7] = (b & 0b00000001) != 0; + } + + if (size % 8 != 0) + { + byte b = packedBytes[packedSize - 1]; + byte mask = 0b10000000; + for (int pos = fullGroups; pos < size; pos++) + { + output[pos] = (b & mask) != 0; + mask >>= 1; + } + } + + return output; + } + } + + public class Int32ArrayColumnDecoder : ColumnDecoder + { + public Column ReadColumn(ByteBuffer reader, TSDataType dataType, int positionCount) + { + bool[] nullIndicators = ColumnDeserializer.DeserializeNullIndicators(reader, positionCount); + + switch (dataType) + { + case TSDataType.INT32: + case TSDataType.DATE: + int[] intValues = new int[positionCount]; + for (int i = 0; i < positionCount; i++) + { + if (nullIndicators != null && nullIndicators[i]) + continue; + intValues[i] = reader.GetInt(); + } + return new IntColumn(0, positionCount, nullIndicators, intValues); + case TSDataType.FLOAT: + float[] floatValues = new float[positionCount]; + for (int i = 0; i < positionCount; i++) + { + if (nullIndicators != null && nullIndicators[i]) + continue; + floatValues[i] = reader.GetFloat(); + } + return new FloatColumn(0, positionCount, nullIndicators, floatValues); + default: + throw new ArgumentException($"Invalid data type: {dataType}"); + } + } + } + + public class Int64ArrayColumnDecoder : ColumnDecoder + { + public Column ReadColumn(ByteBuffer reader, TSDataType dataType, int positionCount) + { + bool[] nullIndicators = ColumnDeserializer.DeserializeNullIndicators(reader, positionCount); + + switch (dataType) + { + case TSDataType.INT64: + case TSDataType.TIMESTAMP: + long[] longValues = new long[positionCount]; + for (int i = 0; i < positionCount; i++) + { + if (nullIndicators != null && nullIndicators[i]) + continue; + longValues[i] = reader.GetLong(); + } + return new LongColumn(0, positionCount, nullIndicators, longValues); + case TSDataType.DOUBLE: + double[] doubleValues = new double[positionCount]; + for (int i = 0; i < positionCount; i++) + { + if (nullIndicators != null && nullIndicators[i]) + continue; + doubleValues[i] = reader.GetDouble(); + } + return new DoubleColumn(0, positionCount, nullIndicators, doubleValues); + default: + throw new ArgumentException($"Invalid data type: {dataType}"); + } + } + } + + public class ByteArrayColumnDecoder : ColumnDecoder + { + public Column ReadColumn(ByteBuffer reader, TSDataType dataType, int positionCount) + { + if (dataType != TSDataType.BOOLEAN) + throw new ArgumentException($"Invalid data type: {dataType}"); + + bool[] nullIndicators = ColumnDeserializer.DeserializeNullIndicators(reader, positionCount); + bool[] values = ColumnDeserializer.DeserializeBooleanArray(reader, positionCount); + return new BooleanColumn(0, positionCount, nullIndicators, values); + } + } + + public class BinaryArrayColumnDecoder : ColumnDecoder + { + public Column ReadColumn(ByteBuffer reader, TSDataType dataType, int positionCount) + { + if (dataType != TSDataType.TEXT) + throw new ArgumentException($"Invalid data type: {dataType}"); + + bool[] nullIndicators = ColumnDeserializer.DeserializeNullIndicators(reader, positionCount); + Binary[] values = new Binary[positionCount]; + + for (int i = 0; i < positionCount; i++) + { + if (nullIndicators != null && nullIndicators[i]) + continue; + int length = reader.GetInt(); + byte[] value = reader.GetBytesByLengthh(length); + values[i] = new Binary(value); + } + + return new BinaryColumn(0, positionCount, nullIndicators, values); + } + } + + public class RunLengthColumnDecoder : ColumnDecoder + { + public Column ReadColumn(ByteBuffer reader, TSDataType dataType, int positionCount) + { + ColumnEncoding encoding = BaseColumnDecoder.DeserializeColumnEncoding(reader); + ColumnDecoder decoder = BaseColumnDecoder.GetDecoder(encoding); + Column column = decoder.ReadColumn(reader, dataType, 1); + return new RunLengthEncodedColumn(column, positionCount); + } + } +} diff --git a/src/Apache.IoTDB/DataStructure/RpcDataSet.cs b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs new file mode 100644 index 0000000..681b0c1 --- /dev/null +++ b/src/Apache.IoTDB/DataStructure/RpcDataSet.cs @@ -0,0 +1,847 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Thrift; + +namespace Apache.IoTDB.DataStructure +{ + public class RpcDataSet : System.IDisposable + { + private const string TimestampColumnName = "Time"; + private const string DefaultTimeFormat = "yyyy-MM-dd HH:mm:ss.fff"; + + private readonly string _sql; + private bool _isClosed; + private readonly Client _client; + public readonly List _columnNameList = new List(); + public readonly List _columnTypeList = new List(); + private readonly Dictionary _columnOrdinalMap = new Dictionary(); + private readonly Dictionary _columnName2TsBlockColumnIndexMap = new Dictionary(); + private readonly List _columnIndex2TsBlockColumnIndexList = new List(); + private readonly TSDataType[] _dataTypeForTsBlockColumn; + private int _fetchSize; + private long _timeout; + private bool _hasCachedRecord; + private bool _lastReadWasNull; + + private int _columnSize; + private long _sessionId; + private long _queryId; + private long _statementId; + private long _time; + private bool _ignoreTimestamp; + private bool _moreData; + + private List _queryResult; + private TsBlock _curTsBlock; + private int _queryResultSize; + private int _queryResultIndex; + public int _tsBlockSize; + private int _tsBlockIndex; + private TimeZoneInfo _zoneId; + private int _timeFactor; + private string _timePrecision; + private bool disposedValue; + + public RpcDataSet(string sql, List columnNameList, List columnTypeList, + Dictionary columnNameIndex, bool ignoreTimestamp, bool moreData, long queryId, + long statementId, Client client, long sessionId, List queryResult, int fetchSize, + long timeout, string zoneId, List columnIndex2TsBlockColumnIndexList) + { + _sql = sql; + _client = client; + _fetchSize = fetchSize; + _timeout = timeout; + _moreData = moreData; + _columnSize = columnNameList.Count; + _sessionId = sessionId; + _queryId = queryId; + _statementId = statementId; + _ignoreTimestamp = ignoreTimestamp; + + int columnStartIndex = 1; + int resultSetColumnSize = columnNameList.Count; + int startIndexForColumnIndex2TsBlockColumnIndexList = 0; + + if (!_ignoreTimestamp) + { + _columnNameList.Add(TimestampColumnName); + _columnTypeList.Add("INT64"); + _columnName2TsBlockColumnIndexMap[TimestampColumnName] = -1; + _columnOrdinalMap[TimestampColumnName] = 1; + + if (columnIndex2TsBlockColumnIndexList != null) + { + columnIndex2TsBlockColumnIndexList.Insert(0, -1); + startIndexForColumnIndex2TsBlockColumnIndexList = 1; + } + columnStartIndex++; + resultSetColumnSize++; + } + + _columnNameList.AddRange(columnNameList); + _columnTypeList.AddRange(columnTypeList); + + if (columnIndex2TsBlockColumnIndexList == null) + { + columnIndex2TsBlockColumnIndexList = new List(); + if (!_ignoreTimestamp) + { + startIndexForColumnIndex2TsBlockColumnIndexList = 1; + columnIndex2TsBlockColumnIndexList.Add(-1); + } + for (int i = 0; i < columnNameList.Count; i++) + columnIndex2TsBlockColumnIndexList.Add(i); + } + + int tsBlockColumnSize = columnIndex2TsBlockColumnIndexList.Max() + 1; + _dataTypeForTsBlockColumn = new TSDataType[tsBlockColumnSize]; + + for (int i = 0; i < columnNameList.Count; i++) + { + int tsBlockColumnIndex = columnIndex2TsBlockColumnIndexList[startIndexForColumnIndex2TsBlockColumnIndexList + i]; + if (tsBlockColumnIndex != -1) + { + TSDataType columnType = Client.GetDataTypeByStr(columnTypeList[i]); + _dataTypeForTsBlockColumn[tsBlockColumnIndex] = columnType; + } + + if (!_columnName2TsBlockColumnIndexMap.ContainsKey(columnNameList[i])) + { + _columnOrdinalMap[columnNameList[i]] = i + columnStartIndex; + _columnName2TsBlockColumnIndexMap[columnNameList[i]] = tsBlockColumnIndex; + } + } + + _queryResult = queryResult; + _queryResultSize = queryResult?.Count ?? 0; + _queryResultIndex = 0; + _tsBlockSize = 0; + _tsBlockIndex = -1; + + _zoneId = TimeZoneInfo.FindSystemTimeZoneById(zoneId); + + if (columnIndex2TsBlockColumnIndexList.Count != _columnNameList.Count) + throw new ArgumentException("Column index list size mismatch"); + + _columnIndex2TsBlockColumnIndexList = columnIndex2TsBlockColumnIndexList; + } + + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + try + { + this.Close().Wait(); + } + catch + { + } + } + disposedValue = true; + } + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + + public async Task Close() + { + if (_isClosed) return; + + var closeRequest = new TSCloseOperationReq + { + SessionId = _sessionId, + StatementId = _statementId, + QueryId = _queryId + }; + + try + { + var status = await _client.ServiceClient.closeOperationAsync(closeRequest); + } + catch (TException e) + { + throw new TException("Operation Handle Close Failed", e); + } + _isClosed = true; + } + + public bool Next() + { + if (HasCachedBlock()) + { + _lastReadWasNull = false; + ConstructOneRow(); + return true; + } + + if (HasCachedByteBuffer()) + { + ConstructOneTsBlock(); + ConstructOneRow(); + return true; + } + + if (_moreData) + { + bool hasResultSet = FetchResults(); + if (hasResultSet && HasCachedByteBuffer()) + { + ConstructOneTsBlock(); + ConstructOneRow(); + return true; + } + } + + Close().Wait(); + return false; + } + + private bool FetchResults() + { + if (_isClosed) + throw new InvalidOperationException("Dataset closed"); + + var req = new TSFetchResultsReq + { + SessionId = _sessionId, + Statement = _sql, + FetchSize = _fetchSize, + QueryId = _queryId, + IsAlign = true, + Timeout = _timeout + }; + + try + { + var task = _client.ServiceClient.fetchResultsV2Async(req); + + var resp = task.ConfigureAwait(false).GetAwaiter().GetResult(); + + if (!resp.HasResultSet) + { + Close().Wait(); + return false; + } + + // return _queryResult != null && _queryResultIndex < _queryResultSize; + _queryResult = resp.QueryResult; + _queryResultIndex = 0; + _queryResultSize = _queryResult?.Count ?? 0; + _tsBlockSize = 0; + _tsBlockIndex = -1; + return true; + } + catch (TException e) + { + throw new TException("Cannot fetch result from server, because of network connection", e); + } + } + + private bool HasCachedBlock() + { + return _curTsBlock != null && _tsBlockIndex < _tsBlockSize - 1; + } + + public bool HasCachedByteBuffer() + { + return _queryResult != null && _queryResultIndex < _queryResultSize; + } + + private void ConstructOneRow() + { + _tsBlockIndex++; + _hasCachedRecord = true; + _time = _curTsBlock.GetTimeByIndex(_tsBlockIndex); + } + + private void ConstructOneTsBlock() + { + _lastReadWasNull = false; + byte[] curTsBlockBytes = _queryResult[_queryResultIndex]; + + _queryResultIndex++; + _curTsBlock = TsBlock.Deserialize(new ByteBuffer(curTsBlockBytes)); + _tsBlockIndex = -1; + _tsBlockSize = _curTsBlock.PositionCount; + } + + public bool IsIgnoredTimestamp => _ignoreTimestamp; + + public bool IsNullByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return IsNull(index, _tsBlockIndex); + } + + public bool IsNullByColumnName(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return IsNull(index, _tsBlockIndex); + } + + private bool IsNull(int index, int rowNum) + { + return index >= 0 && _curTsBlock.GetColumn(index).IsNull(rowNum); + } + + public bool GetBooleanByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetBooleanByTsBlockColumnIndex(index); + } + + public bool GetBoolean(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetBooleanByTsBlockColumnIndex(index); + } + + private bool GetBooleanByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = false; + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetBoolean(_tsBlockIndex); + } + else + { + _lastReadWasNull = true; + return false; + } + } + + public double GetDoubleByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetDoubleByTsBlockColumnIndex(index); + } + + public double GetDouble(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetDoubleByTsBlockColumnIndex(index); + } + + private double GetDoubleByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = false; + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetDouble(_tsBlockIndex); + } + else + { + _lastReadWasNull = true; + return 0.0; + } + } + + public float GetFloatByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetFloatByTsBlockColumnIndex(index); + } + + public float GetFloat(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetFloatByTsBlockColumnIndex(index); + } + + private float GetFloatByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = false; + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetFloat(_tsBlockIndex); + } + else + { + _lastReadWasNull = true; + return 0.0f; + } + } + + public int GetIntByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetIntByTsBlockColumnIndex(index); + } + + public int GetInt(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetIntByTsBlockColumnIndex(index); + } + + private int GetIntByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = false; + TSDataType dataType = _curTsBlock.GetColumn(tsBlockColumnIndex).GetDataType(); + if (dataType == TSDataType.INT64) + { + long v = _curTsBlock.GetColumn(tsBlockColumnIndex).GetLong(_tsBlockIndex); + return (int)v; + } + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetInt(_tsBlockIndex); + } + else + { + _lastReadWasNull = true; + return 0; + } + } + + public long GetLongByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetLongByTsBlockColumnIndex(index); + } + + public long GetLong(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetLongByTsBlockColumnIndex(index); + } + + private long GetLongByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + if (tsBlockColumnIndex == -1) return _curTsBlock.GetTimeByIndex(_tsBlockIndex); + _lastReadWasNull = false; + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetLong(_tsBlockIndex); + } + else + { + _lastReadWasNull = true; + return 0L; + } + } + + public Binary GetBinaryByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetBinaryByTsBlockColumnIndex(index); + } + + public Binary GetBinary(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetBinaryByTsBlockColumnIndex(index); + } + + private Binary GetBinaryByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (!IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = false; + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetBinary(_tsBlockIndex); + } + else + { + _lastReadWasNull = true; + return null; + } + } + + public object GetObjectByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetObjectByTsBlockIndex(index); + } + + public object GetObject(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetObjectByTsBlockIndex(index); + } + + private object GetObjectByTsBlockIndex(int tsBlockColumnIndex) + { + CheckRecord(); + if (IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = true; + return null; + } + + _lastReadWasNull = false; + TSDataType dataType = GetDataTypeByTsBlockColumnIndex(tsBlockColumnIndex); + + switch (dataType) + { + case TSDataType.BOOLEAN: + case TSDataType.INT32: + case TSDataType.INT64: + case TSDataType.FLOAT: + case TSDataType.DOUBLE: + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetObject(_tsBlockIndex); + + case TSDataType.TIMESTAMP: + long timestamp = tsBlockColumnIndex == -1 + ? _curTsBlock.GetTimeByIndex(_tsBlockIndex) + : _curTsBlock.GetColumn(tsBlockColumnIndex).GetLong(_tsBlockIndex); + return ConvertToTimestamp(timestamp, _timeFactor); + + case TSDataType.TEXT: + case TSDataType.STRING: + Binary binaryStr = _curTsBlock.GetColumn(tsBlockColumnIndex).GetBinary(_tsBlockIndex); + return binaryStr != null ? Encoding.UTF8.GetString(binaryStr.Data) : null; + + case TSDataType.BLOB: + return _curTsBlock.GetColumn(tsBlockColumnIndex).GetBinary(_tsBlockIndex); + + case TSDataType.DATE: + int value = _curTsBlock.GetColumn(tsBlockColumnIndex).GetInt(_tsBlockIndex); + return Int32ToDate(value); + + default: + return null; + } + } + + public string GetStringByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetStringByTsBlockColumnIndex(index); + } + + public string GetString(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetStringByTsBlockColumnIndex(index); + } + + private string GetStringByTsBlockColumnIndex(int tsBlockColumnIndex) + { + CheckRecord(); + + if (tsBlockColumnIndex == -1) + { + long timestamp = _curTsBlock.GetTimeByIndex(_tsBlockIndex); + return timestamp.ToString(); + } + + if (IsNull(tsBlockColumnIndex, _tsBlockIndex)) + { + _lastReadWasNull = true; + Console.WriteLine("null"); + return string.Empty; + } + + _lastReadWasNull = false; + return GetStringByTsBlockColumnIndexAndDataType( + tsBlockColumnIndex, + GetDataTypeByTsBlockColumnIndex(tsBlockColumnIndex)); + } + + private string GetStringByTsBlockColumnIndexAndDataType(int index, TSDataType tsDataType) + { + switch (tsDataType) + { + case TSDataType.BOOLEAN: + bool boolVal = _curTsBlock.GetColumn(index).GetBoolean(_tsBlockIndex); + return boolVal.ToString(); + + case TSDataType.INT32: + int intVal = _curTsBlock.GetColumn(index).GetInt(_tsBlockIndex); + return intVal.ToString(); + + case TSDataType.INT64: + long longVal = _curTsBlock.GetColumn(index).GetLong(_tsBlockIndex); + return longVal.ToString(); + + case TSDataType.TIMESTAMP: + long tsValue = _curTsBlock.GetColumn(index).GetLong(_tsBlockIndex); + return FormatDatetime(DefaultTimeFormat, _timePrecision, tsValue, _zoneId); + + case TSDataType.FLOAT: + float floatVal = _curTsBlock.GetColumn(index).GetFloat(_tsBlockIndex); + return floatVal.ToString("G9"); + + case TSDataType.DOUBLE: + double doubleVal = _curTsBlock.GetColumn(index).GetDouble(_tsBlockIndex); + return doubleVal.ToString("G17"); + + case TSDataType.TEXT: + case TSDataType.STRING: + Binary strBytes = _curTsBlock.GetColumn(index).GetBinary(_tsBlockIndex); + return strBytes != null ? Encoding.UTF8.GetString(strBytes.Data) : "0"; + + case TSDataType.BLOB: + Binary blobBytes = _curTsBlock.GetColumn(index).GetBinary(_tsBlockIndex); + return blobBytes.ToString().Replace("-", ""); + + case TSDataType.DATE: + int dateValue = _curTsBlock.GetColumn(index).GetInt(_tsBlockIndex); + DateTime date = Int32ToDate(dateValue); + return date.ToString("yyyy-MM-dd"); + + default: + return string.Empty; + } + } + + public RowRecord GetRow() + { + IReadOnlyList columns = _columnNameList; + int i = 0; + List fieldList = new List(); + long timestamp = 0; + foreach (string columnName in columns) + { + object localfield; + string typeStr = _columnTypeList[i]; + TSDataType dataType = Client.GetDataTypeByStr(typeStr); + + switch (dataType) + { + case TSDataType.BOOLEAN: + localfield = GetBoolean(columnName); + break; + case TSDataType.INT32: + localfield = GetInt(columnName); + break; + case TSDataType.INT64: + localfield = GetLong(columnName); + break; + case TSDataType.TIMESTAMP: + localfield = null; + timestamp = GetLong(columnName); + break; + case TSDataType.FLOAT: + localfield = GetFloat(columnName); + break; + case TSDataType.DOUBLE: + localfield = GetDouble(columnName); + break; + case TSDataType.TEXT: + case TSDataType.STRING: + case TSDataType.BLOB: + case TSDataType.DATE: + localfield = GetString(columnName); + break; + default: + string err_msg = "value format not supported"; + throw new TException(err_msg, null); + } + if (localfield != null) + fieldList.Add(localfield); + i += 1; + } + return new RowRecord(timestamp, fieldList, _columnNameList); + } + + public DateTime GetTimestampByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetTimestampByTsBlockColumnIndex(index); + } + + public DateTime GetTimestamp(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetTimestampByTsBlockColumnIndex(index); + } + + private DateTime GetTimestampByTsBlockColumnIndex(int tsBlockColumnIndex) + { + long value = GetLongByTsBlockColumnIndex(tsBlockColumnIndex); + return ConvertToTimestamp(value, _timeFactor); + } + + public DateTime GetDateByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetDateByTsBlockColumnIndex(index); + } + + public DateTime GetDate(string columnName) + { + int index = GetTsBlockColumnIndexForColumnName(columnName); + return GetDateByTsBlockColumnIndex(index); + } + + private DateTime GetDateByTsBlockColumnIndex(int tsBlockColumnIndex) + { + int value = GetIntByTsBlockColumnIndex(tsBlockColumnIndex); + return Int32ToDate(value); + } + + public TSDataType GetDataTypeByIndex(int columnIndex) + { + int index = GetTsBlockColumnIndexForColumnIndex(columnIndex); + return GetDataTypeByTsBlockColumnIndex(index); + } + + public TSDataType GetDataType(string columnName) + { + if (!_columnName2TsBlockColumnIndexMap.TryGetValue(columnName, out int index)) + throw new ArgumentException($"Column {columnName} not found"); + + return GetDataTypeByTsBlockColumnIndex(index); + } + + private TSDataType GetDataTypeByTsBlockColumnIndex(int tsBlockColumnIndex) + { + return tsBlockColumnIndex < 0 + ? TSDataType.TIMESTAMP + : _dataTypeForTsBlockColumn[tsBlockColumnIndex]; + } + + private DateTime ConvertToTimestamp(long value, double timeFactor) + { + long timestamp = (long)(value * timeFactor); + return DateTimeOffset.FromUnixTimeMilliseconds(timestamp).DateTime; + } + + public static DateTime Int32ToDate(int val) + { + int year = val / 10000; + int remaining = val % 10000; + int month = remaining / 100; + int day = remaining % 100; + + if (year < 1 || year > 9999) + throw new ArgumentOutOfRangeException( + paramName: nameof(val), + message: $"Invalid year value: {year}. Year must be between 1-9999" + ); + + if (month < 1 || month > 12) + throw new ArgumentOutOfRangeException( + paramName: nameof(val), + message: $"Invalid month value: {month}. Month must be between 1-12" + ); + + int daysInMonth = DateTime.DaysInMonth(year, month); + if (day < 1 || day > daysInMonth) + throw new ArgumentOutOfRangeException( + paramName: nameof(val), + message: $"Invalid day value: {day}. Day must be between 1-{daysInMonth} for {year}-{month}" + ); + + return new DateTime(year, month, day, 0, 0, 0, DateTimeKind.Utc); + } + + private string FormatDatetime(string format, string precision, long value, TimeZoneInfo zone) + { + DateTime dt = ConvertToTimestamp(value, 1); // 假设timeFactor=1 + DateTime convertedTime = TimeZoneInfo.ConvertTime(dt, zone); + return convertedTime.ToString(format); + } + + private int GetTsBlockColumnIndexForColumnName(string columnName) + { + if (!_columnName2TsBlockColumnIndexMap.TryGetValue(columnName, out int index)) + throw new ArgumentException($"Column {columnName} not found"); + return index; + } + + public int FindColumn(string columnName) + { + if (!_columnOrdinalMap.TryGetValue(columnName, out int ordinal)) + throw new ArgumentException($"Column {columnName} not found"); + return ordinal; + } + + public string FindColumnNameByIndex(int columnIndex) + { + if (columnIndex <= 0) + throw new ArgumentOutOfRangeException(nameof(columnIndex), "Column index should start from 1"); + + if (columnIndex > _columnNameList.Count) + throw new ArgumentOutOfRangeException(nameof(columnIndex), + $"Column index {columnIndex} out of range {_columnNameList.Count}"); + + return _columnNameList[columnIndex - 1]; + } + + private int GetTsBlockColumnIndexForColumnIndex(int columnIndex) + { + int adjustedIndex = columnIndex - 1; + if (adjustedIndex < 0 || adjustedIndex >= _columnIndex2TsBlockColumnIndexList.Count) + throw new ArgumentOutOfRangeException(nameof(columnIndex), + $"Index {adjustedIndex} out of range {_columnIndex2TsBlockColumnIndexList.Count}"); + + return _columnIndex2TsBlockColumnIndexList[adjustedIndex]; + } + + private void CheckRecord() + { + if (_queryResultIndex > _queryResultSize || + _tsBlockIndex >= _tsBlockSize || + _queryResult == null || + _curTsBlock == null) + { + throw new InvalidOperationException("No record remains"); + } + } + + public int GetValueColumnStartIndex() => _ignoreTimestamp ? 0 : 1; + + public int GetColumnSize() => _columnNameList.Count; + + public List GetColumnTypeList() => new List(_columnTypeList); + + public List GetColumnNameTypeList() => new List(_columnTypeList); + + public bool IsClosed() => _isClosed; + + public int FetchSize + { + get => _fetchSize; + set => _fetchSize = value; + } + + public bool HasCachedRecord + { + get => _hasCachedRecord; + set => _hasCachedRecord = value; + } + + public bool IsLastReadWasNull() => _lastReadWasNull; + + public long GetCurrentRowTime() => _time; + + public bool IsIgnoreTimestamp() => _ignoreTimestamp; + } +} diff --git a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs index 3d606f4..5489496 100644 --- a/src/Apache.IoTDB/DataStructure/SessionDataSet.cs +++ b/src/Apache.IoTDB/DataStructure/SessionDataSet.cs @@ -32,284 +32,92 @@ public class SessionDataSet : System.IDisposable private readonly string _sql; private readonly List _columnNames; private readonly Dictionary _columnNameIndexMap; - private readonly Dictionary _duplicateLocation; private readonly List _columnTypeLst; - private TSQueryDataSet _queryDataset; - private readonly byte[] _currentBitmap; - private readonly int _columnSize; - private List _valueBufferLst, _bitmapBufferLst; - private ByteBuffer _timeBuffer; - private readonly ConcurrentClientQueue _clientQueue; private Client _client; - private int _rowIndex; - private bool _hasCatchedResult; - private RowRecord _cachedRowRecord; private bool _isClosed = false; private bool disposedValue; + private RpcDataSet _rpcDataSet; + private string _zoneId; + private readonly ConcurrentClientQueue _clientQueue; private string TimestampStr => "Time"; private int StartIndex => 2; private int Flag => 0x80; private int DefaultTimeout => 10000; public int FetchSize { get; set; } - public int RowCount { get; set; } - public SessionDataSet(string sql, TSExecuteStatementResp resp, Client client, ConcurrentClientQueue clientQueue, long statementId) + public SessionDataSet( + string sql, List ColumnNameList, List ColumnTypeList, + Dictionary ColumnNameIndexMap, long QueryId, long statementId, Client client, List QueryResult, + bool IgnoreTimeStamp, bool MoreData, string zoneId, List ColumnIndex2TsBlockColumnIndexList, ConcurrentClientQueue clientQueue + ) { - _clientQueue = clientQueue; _client = client; _sql = sql; - _queryDataset = resp.QueryDataSet; - _queryId = resp.QueryId; + _queryId = QueryId; _statementId = statementId; - _columnSize = resp.Columns.Count; - _currentBitmap = new byte[_columnSize]; - _columnNames = new List(); - _timeBuffer = new ByteBuffer(_queryDataset.Time); - // column name -> column location - _columnNameIndexMap = new Dictionary(); - _columnTypeLst = new List(); - _duplicateLocation = new Dictionary(); - _valueBufferLst = new List(); - _bitmapBufferLst = new List(); - // some internal variable - _hasCatchedResult = false; - _rowIndex = 0; - RowCount = _queryDataset.Time.Length / sizeof(long); + _columnNameIndexMap = ColumnNameIndexMap; - _columnNames = resp.Columns; - _columnTypeLst = resp.DataTypeList; - - int deduplicateIdx = 0; - Dictionary columnToFirstIndexMap = new Dictionary(); - for (var i = 0; i < _columnSize; i++) - { - var columnName = _columnNames[i]; - if (_columnNameIndexMap.ContainsKey(columnName)) - { - _duplicateLocation[i] = columnToFirstIndexMap[columnName]; - } - else - { - columnToFirstIndexMap[columnName] = i; - if (resp.ColumnNameIndexMap != null) - { - int valueIndex = resp.ColumnNameIndexMap[columnName]; - _columnNameIndexMap[columnName] = valueIndex; - _valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[valueIndex])); - _bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[valueIndex])); - } - else - { - _columnNameIndexMap[columnName] = deduplicateIdx; - _valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[deduplicateIdx])); - _bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[deduplicateIdx])); - } - deduplicateIdx++; - } - } - } - public List ColumnNames => _columnNames; - - - private List GetColumnNames() - { - var lst = new List - { - "timestamp" - }; - lst.AddRange(_columnNames); - return lst; - } - - public void ShowTableNames() - { - var str = GetColumnNames() - .Aggregate("", (current, name) => current + (name + "\t\t")); - - Console.WriteLine(str); + _columnNames = ColumnNameList; + _columnTypeLst = ColumnTypeList; + _zoneId = zoneId; + _clientQueue = clientQueue; + + _rpcDataSet = new RpcDataSet( + _sql, _columnNames, _columnTypeLst, _columnNameIndexMap, IgnoreTimeStamp, + MoreData, _queryId, _statementId, _client, _client.SessionId, QueryResult, FetchSize, + DefaultTimeout, _zoneId, ColumnIndex2TsBlockColumnIndexList + ); } + public bool HasNext() => _rpcDataSet.Next(); + public RowRecord Next() => _rpcDataSet.GetRow(); + public bool IsNull(string columnName) => _rpcDataSet.IsNullByColumnName(columnName); + public bool IsNullByIndex(int columnIndex) => _rpcDataSet.IsNullByIndex(columnIndex); - public bool HasNext() - { - if (_hasCatchedResult) - { - return true; - } - - // we have consumed all current data, fetch some more - if (!_timeBuffer.HasRemaining()) - { - if (!FetchResults()) - { - return false; - } - } + public bool GetBooleanByIndex(int columnIndex) => _rpcDataSet.GetBooleanByIndex(columnIndex); + public bool GetBoolean(string columnName) => _rpcDataSet.GetBoolean(columnName); - ConstructOneRow(); - _hasCatchedResult = true; - return true; - } - - public RowRecord Next() - { - if (!_hasCatchedResult) - { - if (!HasNext()) - { - return null; - } - } + public double GetDoubleByIndex(int columnIndex) => _rpcDataSet.GetDoubleByIndex(columnIndex); + public double GetDouble(string columnName) => _rpcDataSet.GetDouble(columnName); - _hasCatchedResult = false; - return _cachedRowRecord; - } - public RowRecord GetRow() - { - return _cachedRowRecord; - } + public float GetFloatByIndex(int columnIndex) => _rpcDataSet.GetFloatByIndex(columnIndex); + public float GetFloat(string columnName) => _rpcDataSet.GetFloat(columnName); - private TSDataType GetDataTypeFromStr(string str) - { - return str switch - { - "BOOLEAN" => TSDataType.BOOLEAN, - "INT32" => TSDataType.INT32, - "INT64" => TSDataType.INT64, - "FLOAT" => TSDataType.FLOAT, - "DOUBLE" => TSDataType.DOUBLE, - "TEXT" => TSDataType.TEXT, - "NULLTYPE" => TSDataType.NONE, - "TIMESTAMP" => TSDataType.TIMESTAMP, - "DATE" => TSDataType.DATE, - "BLOB" => TSDataType.BLOB, - "STRING" => TSDataType.STRING, - _ => TSDataType.STRING - }; - } + public int GetIntByIndex(int columnIndex) => _rpcDataSet.GetIntByIndex(columnIndex); + public int GetInt(string columnName) => _rpcDataSet.GetInt(columnName); - private void ConstructOneRow() - { - List fieldList = new List(); + public long GetLongByIndex(int columnIndex) => _rpcDataSet.GetLongByIndex(columnIndex); + public long GetLong(string columnName) => _rpcDataSet.GetLong(columnName); - for (int i = 0; i < _columnSize; i++) - { - if (_duplicateLocation.ContainsKey(i)) - { - var field = fieldList[_duplicateLocation[i]]; - fieldList.Add(field); - } - else - { - var columnValueBuffer = _valueBufferLst[i]; - var columnBitmapBuffer = _bitmapBufferLst[i]; + public object GetObjectByIndex(int columnIndex) => _rpcDataSet.GetObjectByIndex(columnIndex); + public object GetObject(string columnName) => _rpcDataSet.GetObject(columnName); - if (_rowIndex % 8 == 0) - { - _currentBitmap[i] = columnBitmapBuffer.GetByte(); - } + public string GetStringByIndex(int columnIndex) => _rpcDataSet.GetStringByIndex(columnIndex); + public string GetString(string columnName) => _rpcDataSet.GetString(columnName); - object localField; - if (!IsNull(i, _rowIndex)) - { - var columnDataType = GetDataTypeFromStr(_columnTypeLst[i]); + public DateTime GetTimestampByIndex(int columnIndex) => _rpcDataSet.GetTimestampByIndex(columnIndex); + public DateTime GetTimestamp(string columnName) => _rpcDataSet.GetTimestamp(columnName); + public DateTime GetDateByIndex(int columnIndex) => _rpcDataSet.GetDateByIndex(columnIndex); + public DateTime GetDate(string columnName) => _rpcDataSet.GetDate(columnName); - switch (columnDataType) - { - case TSDataType.BOOLEAN: - localField = columnValueBuffer.GetBool(); - break; - case TSDataType.INT32: - // case TSDataType.DATE: - localField = columnValueBuffer.GetInt(); - break; - case TSDataType.DATE: - localField = Utils.ParseIntToDate(columnValueBuffer.GetInt()); - break; - case TSDataType.INT64: - case TSDataType.TIMESTAMP: - localField = columnValueBuffer.GetLong(); - break; - case TSDataType.FLOAT: - localField = columnValueBuffer.GetFloat(); - break; - case TSDataType.DOUBLE: - localField = columnValueBuffer.GetDouble(); - break; - case TSDataType.TEXT: - case TSDataType.STRING: - // case TSDataType.BLOB: - localField = columnValueBuffer.GetStr(); - break; - case TSDataType.BLOB: - localField = columnValueBuffer.GetBinary(); - break; - // TODO - default: - string err_msg = "value format not supported"; - throw new TException(err_msg, null); - } + public Binary GetBlobByIndex(int columnIndex) => _rpcDataSet.GetBinaryByIndex(columnIndex); + public Binary GetBlob(string columnName) => _rpcDataSet.GetBinary(columnName); - fieldList.Add(localField); - } - else - { - localField = null; - fieldList.Add(DBNull.Value); - } - } - } + public int FindColumn(string columnName) => _rpcDataSet.FindColumn(columnName); - long timestamp = _timeBuffer.GetLong(); - _rowIndex += 1; - _cachedRowRecord = new RowRecord(timestamp, fieldList, _columnNames); - } + public IReadOnlyList GetColumnNames() => _rpcDataSet._columnNameList; + public IReadOnlyList GetColumnTypes() => _rpcDataSet._columnTypeList; - private bool IsNull(int loc, int row_index) - { - byte bitmap = _currentBitmap[loc]; - int shift = row_index % 8; - return ((Flag >> shift) & bitmap) == 0; - } + public int RowCount() => _rpcDataSet._tsBlockSize; - private bool FetchResults() + public void ShowTableNames() { - _rowIndex = 0; - var req = new TSFetchResultsReq(_client.SessionId, _sql, FetchSize, _queryId, true) - { - Timeout = DefaultTimeout - }; - try - { - var task = _client.ServiceClient.fetchResultsAsync(req); - - var resp = task.ConfigureAwait(false).GetAwaiter().GetResult(); - - if (resp.HasResultSet) - { - _queryDataset = resp.QueryDataSet; - // reset buffer - _timeBuffer = new ByteBuffer(resp.QueryDataSet.Time); - _valueBufferLst = new List(); - _bitmapBufferLst = new List(); - for (int index = 0; index < _queryDataset.ValueList.Count; index++) - { - string columnName = _columnNames[index]; - int valueIndex = _columnNameIndexMap[columnName]; - _valueBufferLst.Add(new ByteBuffer(_queryDataset.ValueList[valueIndex])); - _bitmapBufferLst.Add(new ByteBuffer(_queryDataset.BitmapList[valueIndex])); - } - - // reset row index - _rowIndex = 0; - } - - return resp.HasResultSet; - } - catch (TException e) + IReadOnlyList columns = GetColumnNames(); + foreach (string columnName in columns) { - throw new TException("Cannot fetch result from server, because of network connection", e); + Console.Write($"{columnName}\t"); } + Console.WriteLine(); } public async Task Close() @@ -332,6 +140,7 @@ public async Task Close() } finally { + await _rpcDataSet.Close(); _clientQueue.Add(_client); _client = null; } @@ -352,10 +161,6 @@ protected virtual void Dispose(bool disposing) { } } - _queryDataset = null; - _timeBuffer = null; - _valueBufferLst = null; - _bitmapBufferLst = null; disposedValue = true; } } diff --git a/src/Apache.IoTDB/DataStructure/TsBlock.cs b/src/Apache.IoTDB/DataStructure/TsBlock.cs new file mode 100644 index 0000000..14d598c --- /dev/null +++ b/src/Apache.IoTDB/DataStructure/TsBlock.cs @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; + +namespace Apache.IoTDB.DataStructure +{ + public class TsBlock + { + private readonly Column _timeColumn; + private readonly List _valueColumns; + private readonly int _positionCount; + + public TsBlock(int positionCount, Column timeColumn, params Column[] valueColumns) + { + _positionCount = positionCount; + _timeColumn = timeColumn; + _valueColumns = new List(valueColumns); + if (valueColumns == null) + throw new ArgumentNullException(nameof(valueColumns)); + if (timeColumn.GetPositionCount() != positionCount) + throw new ArgumentException( + $"input positionCount {positionCount} does not match timeColumn.positionCount {timeColumn.GetPositionCount()}" + ); + for (int i = 0; i < ValueColumnCount; i++) + { + if (valueColumns[i].GetPositionCount() != positionCount) + throw new ArgumentException( + $"input positionCount {positionCount} does not match valueColumn{i}.positionCount {valueColumns[i].GetPositionCount()}" + ); + } + } + + public static TsBlock Deserialize(ByteBuffer reader) + { + // Serialized tsblock: + // +-------------+---------------+---------+------------+-----------+----------+ + // | val col cnt | val col types | pos cnt | encodings | time col | val col | + // +-------------+---------------+---------+------------+-----------+----------+ + // | int32 | list[byte] | int32 | list[byte] | bytes | bytes | + // +-------------+---------------+---------+------------+-----------+----------+ + + // Read value column count + var valueColumnCount = reader.GetInt(); + + // Read value column data types + var valueColumnDataTypes = new TSDataType[valueColumnCount]; + for (int i = 0; i < valueColumnCount; i++) + { + valueColumnDataTypes[i] = DeserializeDataType(reader); + } + + // Read position count + var positionCount = reader.GetInt(); + + // Read column encodings + // Read time column encoding + ColumnEncoding timeColumnEncodings = DeserializeColumnEncoding(reader); + + // Read value column encodings + var valueColumnEncodings = new ColumnEncoding[valueColumnCount]; + for (int i = 1; i < valueColumnCount + 1; i++) + { + valueColumnEncodings[i - 1] = DeserializeColumnEncoding(reader); + } + + // Read time column + var timeColumnDecoder = BaseColumnDecoder.GetDecoder(timeColumnEncodings); + var timeColumn = timeColumnDecoder.ReadColumn(reader, TSDataType.INT64, positionCount); + + // Read value columns + var valueColumns = new Column[valueColumnCount]; + for (int i = 1; i < valueColumnCount + 1; i++) + { + var decoder = BaseColumnDecoder.GetDecoder(valueColumnEncodings[i - 1]); + valueColumns[i - 1] = decoder.ReadColumn(reader, valueColumnDataTypes[i - 1], positionCount); + } + + return new TsBlock(positionCount, timeColumn, valueColumns); + } + + private static TSDataType DeserializeDataType(ByteBuffer reader) + { + byte b = reader.GetByte(); + return (TSDataType)b; + } + + private static ColumnEncoding DeserializeColumnEncoding(ByteBuffer reader) + { + byte b = reader.GetByte(); + return (ColumnEncoding)b; + } + + public int PositionCount => _positionCount; + + public long GetStartTime() => _timeColumn.GetLong(0); + public long GetEndTime() => _timeColumn.GetLong(_positionCount - 1); + public bool IsEmpty => _positionCount == 0; + public long GetTimeByIndex(int index) => _timeColumn.GetLong(index); + public int ValueColumnCount => _valueColumns.Count; + public Column TimeColumn => _timeColumn; + public IReadOnlyList ValueColumns => _valueColumns; + public Column GetColumn(int columnIndex) => _valueColumns[columnIndex]; + } +} diff --git a/src/Apache.IoTDB/SessionPool.Builder.cs b/src/Apache.IoTDB/SessionPool.Builder.cs index 69b8e42..f943d81 100644 --- a/src/Apache.IoTDB/SessionPool.Builder.cs +++ b/src/Apache.IoTDB/SessionPool.Builder.cs @@ -31,7 +31,7 @@ public class Builder private string _username = "root"; private string _password = "root"; private int _fetchSize = 1024; - private string _zoneId = "UTC+08:00"; + private string _zoneId = "Asia/Shanghai"; private int _poolSize = 8; private bool _enableRpcCompression = false; private int _connectionTimeoutInMs = 500; @@ -118,7 +118,7 @@ public Builder() _username = "root"; _password = "root"; _fetchSize = 1024; - _zoneId = "UTC+08:00"; + _zoneId = "Asia/Shanghai"; _poolSize = 8; _enableRpcCompression = false; _connectionTimeoutInMs = 500; diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index 0fb9089..135199b 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -66,23 +66,23 @@ public partial class SessionPool : IDisposable [Obsolete("This method is deprecated, please use new SessionPool.Builder().")] public SessionPool(string host, int port, int poolSize) - : this(host, port, "root", "root", 1024, "UTC+08:00", poolSize, true, 60) + : this(host, port, "root", "root", 1024, "Asia/Shanghai", poolSize, true, 60) { } [Obsolete(" This method is deprecated, please use new SessionPool.Builder().")] public SessionPool(string host, int port, string username, string password) - : this(host, port, username, password, 1024, "UTC+08:00", 8, true, 60) + : this(host, port, username, password, 1024, "Asia/Shanghai", 8, true, 60) { } public SessionPool(string host, int port, string username, string password, int fetchSize) - : this(host, port, username, password, fetchSize, "UTC+08:00", 8, true, 60) + : this(host, port, username, password, fetchSize, "Asia/Shanghai", 8, true, 60) { } - public SessionPool(string host, int port) : this(host, port, "root", "root", 1024, "UTC+08:00", 8, true, 60) + public SessionPool(string host, int port) : this(host, port, "root", "root", 1024, "Asia/Shanghai", 8, true, 60) { } public SessionPool(string host, int port, string username, string password, int fetchSize, string zoneId, int poolSize, bool enableRpcCompression, int timeout) @@ -110,15 +110,15 @@ protected internal SessionPool(string host, int port, string username, string pa /// The list of node URLs to connect to, multiple ip:rpcPort eg.127.0.0.1:9001 /// The size of the session pool. public SessionPool(List nodeUrls, int poolSize) - : this(nodeUrls, "root", "root", 1024, "UTC+08:00", poolSize, true, 60) + : this(nodeUrls, "root", "root", 1024, "Asia/Shanghai", poolSize, true, 60) { } public SessionPool(List nodeUrls, string username, string password) - : this(nodeUrls, username, password, 1024, "UTC+08:00", 8, true, 60) + : this(nodeUrls, username, password, 1024, "Asia/Shanghai", 8, true, 60) { } public SessionPool(List nodeUrls, string username, string password, int fetchSize) - : this(nodeUrls, username, password, fetchSize, "UTC+08:00", 8, true, 60) + : this(nodeUrls, username, password, fetchSize, "Asia/Shanghai", 8, true, 60) { } public SessionPool(List nodeUrls, string username, string password, int fetchSize, string zoneId) @@ -710,9 +710,9 @@ public async Task CheckTimeSeriesExistsAsync(string tsPath) try { var sql = "SHOW TIMESERIES " + tsPath; - var sessionDataset = await ExecuteQueryStatementAsync(sql); - bool timeSeriesExists = sessionDataset.HasNext(); - await sessionDataset.Close(); // be sure to close the sessionDataset to put the client back to the pool + var sessionDataSet = await ExecuteQueryStatementAsync(sql); + bool timeSeriesExists = sessionDataSet.HasNext(); + await sessionDataSet.Close(); // be sure to close the SessionDataSet to put the client back to the pool return timeSeriesExists; } catch (TException e) @@ -1327,7 +1327,7 @@ public async Task ExecuteQueryStatementAsync(string sql, long ti Timeout = timeoutInMs }; - var resp = await client.ServiceClient.executeQueryStatementAsync(req); + var resp = await client.ServiceClient.executeQueryStatementV2Async(req); var status = resp.Status; if (_utilFunctions.VerifySuccess(status) == -1) @@ -1335,7 +1335,11 @@ public async Task ExecuteQueryStatementAsync(string sql, long ti throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message)); } - return new SessionDataSet(sql, resp, client, _clients, client.StatementId) + return new SessionDataSet( + sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, resp.QueryId, + client.StatementId, client, resp.QueryResult, resp.IgnoreTimeStamp, + resp.MoreData, _zoneId, resp.ColumnIndex2TsBlockColumnIndexList, _clients + ) { FetchSize = _fetchSize, }; @@ -1356,7 +1360,7 @@ public async Task ExecuteStatementAsync(string sql, long timeout Timeout = timeout }; - var resp = await client.ServiceClient.executeStatementAsync(req); + var resp = await client.ServiceClient.executeStatementV2Async(req); var status = resp.Status; if (_utilFunctions.VerifySuccess(status) == -1) @@ -1364,7 +1368,11 @@ public async Task ExecuteStatementAsync(string sql, long timeout throw new Exception(string.Format("execute query failed, sql: {0}, message: {1}", sql, status.Message)); } - return new SessionDataSet(sql, resp, client, _clients, client.StatementId) + return new SessionDataSet( + sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, resp.QueryId, + client.StatementId, client, resp.QueryResult, resp.IgnoreTimeStamp, + resp.MoreData, _zoneId, resp.ColumnIndex2TsBlockColumnIndexList, _clients + ) { FetchSize = _fetchSize, }; @@ -1432,7 +1440,7 @@ public async Task ExecuteRawDataQuery(List paths, long s EnableRedirectQuery = false }; - var resp = await client.ServiceClient.executeRawDataQueryAsync(req); + var resp = await client.ServiceClient.executeRawDataQueryV2Async(req); var status = resp.Status; if (_utilFunctions.VerifySuccess(status) == -1) @@ -1440,7 +1448,11 @@ public async Task ExecuteRawDataQuery(List paths, long s throw new Exception(string.Format("execute raw data query failed, message: {0}", status.Message)); } - return new SessionDataSet("", resp, client, _clients, client.StatementId) + return new SessionDataSet( + "", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, resp.QueryId, + client.StatementId, client, resp.QueryResult, resp.IgnoreTimeStamp, + resp.MoreData, _zoneId, resp.ColumnIndex2TsBlockColumnIndexList, _clients + ) { FetchSize = _fetchSize, }; @@ -1460,7 +1472,7 @@ public async Task ExecuteLastDataQueryAsync(List paths, EnableRedirectQuery = false }; - var resp = await client.ServiceClient.executeLastDataQueryAsync(req); + var resp = await client.ServiceClient.executeLastDataQueryV2Async(req); var status = resp.Status; if (_utilFunctions.VerifySuccess(status) == -1) @@ -1468,7 +1480,11 @@ public async Task ExecuteLastDataQueryAsync(List paths, throw new Exception(string.Format("execute last data query failed, message: {0}", status.Message)); } - return new SessionDataSet("", resp, client, _clients, client.StatementId) + return new SessionDataSet( + "", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, resp.QueryId, + client.StatementId, client, resp.QueryResult, resp.IgnoreTimeStamp, + resp.MoreData, _zoneId, resp.ColumnIndex2TsBlockColumnIndexList, _clients + ) { FetchSize = _fetchSize, }; diff --git a/src/Apache.IoTDB/TableSessionPool.Builder.cs b/src/Apache.IoTDB/TableSessionPool.Builder.cs index fdeb78e..07387b5 100644 --- a/src/Apache.IoTDB/TableSessionPool.Builder.cs +++ b/src/Apache.IoTDB/TableSessionPool.Builder.cs @@ -33,7 +33,7 @@ public class Builder private string _username = "root"; private string _password = "root"; private int _fetchSize = 1024; - private string _zoneId = "UTC+08:00"; + private string _zoneId = "Asia/Shanghai"; private int _poolSize = 8; private bool _enableRpcCompression = false; private int _connectionTimeoutInMs = 500; @@ -120,7 +120,7 @@ public Builder() _username = "root"; _password = "root"; _fetchSize = 1024; - _zoneId = "UTC+08:00"; + _zoneId = "Asia/Shanghai"; _poolSize = 8; _enableRpcCompression = false; _connectionTimeoutInMs = 500;