Skip to content

Commit f4fa0c7

Browse files
committed
Make DataReader disposing underlying command on reaching the end of a sequence
# Conflicts: # Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/BatchingCommandProcessor.cs # Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/Command.cs
1 parent 81255c8 commit f4fa0c7

File tree

4 files changed

+122
-71
lines changed

4 files changed

+122
-71
lines changed

Orm/Xtensive.Orm.Tests/Issues/IssueGitHub071_DataReaderRemainsOpen.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System;
66
using System.Collections.Generic;
77
using System.Linq;
8-
using System.Text;
98
using NUnit.Framework;
109
using Xtensive.Orm.Configuration;
1110
using Xtensive.Orm.Tests.Issues.IssueGitHub071_DataReaderRemainsOpenModel;

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/BatchingCommandProcessor.cs

Lines changed: 78 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -29,55 +29,57 @@ void ISqlTaskProcessor.ProcessTask(SqlPersistTask task, CommandProcessorContext
2929
ProcessUnbatchedTask(task, context);
3030
return;
3131
}
32+
3233
var sequence = Factory.CreatePersistParts(task, GetParameterPrefix(context));
33-
foreach (var part in sequence)
34+
foreach (var part in sequence) {
3435
context.ActiveCommand.AddPart(part);
36+
}
3537
}
3638

37-
public override void RegisterTask(SqlTask task)
38-
{
39-
tasks.Enqueue(task);
40-
}
39+
public override void RegisterTask(SqlTask task) => tasks.Enqueue(task);
4140

42-
public override void ClearTasks()
43-
{
44-
tasks.Clear();
45-
}
41+
public override void ClearTasks() => tasks.Clear();
4642

4743
public override void ExecuteTasks(CommandProcessorContext context)
4844
{
4945
PutTasksForExecution(context);
5046

51-
while (context.ProcessingTasks.Count >= batchSize)
47+
while (context.ProcessingTasks.Count >= batchSize) {
5248
ExecuteBatch(batchSize, null, context);
49+
}
5350

54-
if (!context.AllowPartialExecution)
51+
if (!context.AllowPartialExecution) {
5552
ExecuteBatch(context.ProcessingTasks.Count, null, context);
53+
}
5654
}
5755

5856
public override async Task ExecuteTasksAsync(CommandProcessorContext context, CancellationToken token)
5957
{
6058
PutTasksForExecution(context);
6159

62-
while (context.ProcessingTasks.Count >= batchSize)
60+
while (context.ProcessingTasks.Count >= batchSize) {
6361
await ExecuteBatchAsync(batchSize, null, context, token).ConfigureAwait(false);
62+
}
6463

65-
if (!context.AllowPartialExecution)
64+
if (!context.AllowPartialExecution) {
6665
await ExecuteBatchAsync(context.ProcessingTasks.Count, null, context, token).ConfigureAwait(false);
66+
}
6767
}
6868

6969
public override DataReader ExecuteTasksWithReader(QueryRequest request, CommandProcessorContext context)
7070
{
7171
context.AllowPartialExecution = false;
7272
PutTasksForExecution(context);
7373

74-
while (context.ProcessingTasks.Count >= batchSize)
74+
while (context.ProcessingTasks.Count >= batchSize) {
7575
ExecuteBatch(batchSize, null, context);
76+
}
7677

7778
return ExecuteBatch(context.ProcessingTasks.Count, request, context).CreateReader(request.GetAccessor());
7879
}
7980

80-
public override async Task<DataReader> ExecuteTasksWithReaderAsync(QueryRequest request, CommandProcessorContext context, CancellationToken token)
81+
public override async Task<DataReader> ExecuteTasksWithReaderAsync(QueryRequest request,
82+
CommandProcessorContext context, CancellationToken token)
8183
{
8284
context.ProcessingTasks = new Queue<SqlTask>(tasks);
8385
tasks.Clear();
@@ -94,10 +96,11 @@ public override async Task<DataReader> ExecuteTasksWithReaderAsync(QueryRequest
9496

9597
private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, CommandProcessorContext context)
9698
{
97-
var shouldReturnReader = lastRequest!=null;
99+
var shouldReturnReader = lastRequest != null;
98100

99-
if (numberOfTasks==0 && !shouldReturnReader)
101+
if (numberOfTasks == 0 && !shouldReturnReader) {
100102
return null;
103+
}
101104

102105
var tasksToProcess = context.ProcessingTasks;
103106

@@ -109,48 +112,58 @@ private Command ExecuteBatch(int numberOfTasks, QueryRequest lastRequest, Comman
109112
var task = tasksToProcess.Dequeue();
110113
task.ProcessWith(this, context);
111114
}
115+
116+
var command = context.ActiveCommand;
112117
if (shouldReturnReader) {
113118
var part = Factory.CreateQueryPart(lastRequest, context.ParameterContext);
114-
context.ActiveCommand.AddPart(part);
119+
command.AddPart(part);
115120
}
116-
if (context.ActiveCommand.Count==0)
121+
122+
if (command.Count == 0) {
117123
return null;
124+
}
125+
118126
var hasQueryTasks = context.ActiveTasks.Count > 0;
119127
if (!hasQueryTasks && !shouldReturnReader) {
120-
context.ActiveCommand.ExecuteNonQuery();
128+
command.ExecuteNonQuery();
121129
return null;
122130
}
123-
context.ActiveCommand.ExecuteReader();
131+
132+
command.ExecuteReader();
124133
if (hasQueryTasks) {
125134
var currentQueryTask = 0;
126135
while (currentQueryTask < context.ActiveTasks.Count) {
127136
var queryTask = context.ActiveTasks[currentQueryTask];
128137
var accessor = queryTask.Request.GetAccessor();
129138
var result = queryTask.Output;
130-
var reader = context.ActiveCommand.CreateReader(accessor);
131-
while (reader.MoveNext()) {
132-
result.Add(reader.Current);
139+
while (command.NextRow()) {
140+
result.Add(command.ReadTupleWith(accessor));
133141
}
134142

135-
context.ActiveCommand.NextResult();
143+
_ = command.NextResult();
136144
currentQueryTask++;
137145
}
138146
}
139-
return shouldReturnReader ? context.ActiveCommand : null;
147+
148+
return shouldReturnReader ? command : null;
140149
}
141150
finally {
142-
if (!shouldReturnReader)
151+
if (!shouldReturnReader) {
143152
context.ActiveCommand.Dispose();
153+
}
154+
144155
ReleaseCommand(context);
145156
}
146157
}
147158

148-
private async Task<Command> ExecuteBatchAsync(int numberOfTasks, QueryRequest lastRequest, CommandProcessorContext context, CancellationToken token)
159+
private async Task<Command> ExecuteBatchAsync(int numberOfTasks, QueryRequest lastRequest,
160+
CommandProcessorContext context, CancellationToken token)
149161
{
150-
var shouldReturnReader = lastRequest!=null;
162+
var shouldReturnReader = lastRequest != null;
151163

152-
if (numberOfTasks==0 && !shouldReturnReader)
164+
if (numberOfTasks == 0 && !shouldReturnReader) {
153165
return null;
166+
}
154167

155168
var tasksToProcess = context.ProcessingTasks;
156169

@@ -162,36 +175,45 @@ private async Task<Command> ExecuteBatchAsync(int numberOfTasks, QueryRequest la
162175
var task = tasksToProcess.Dequeue();
163176
task.ProcessWith(this, context);
164177
}
165-
if (shouldReturnReader)
166-
context.ActiveCommand.AddPart(Factory.CreateQueryPart(lastRequest, context.ParameterContext));
167-
if (context.ActiveCommand.Count==0)
178+
179+
var command = context.ActiveCommand;
180+
if (shouldReturnReader) {
181+
command.AddPart(Factory.CreateQueryPart(lastRequest, context.ParameterContext));
182+
}
183+
184+
if (command.Count == 0) {
168185
return null;
186+
}
187+
169188
var hasQueryTasks = context.ActiveTasks.Count > 0;
170189
if (!hasQueryTasks && !shouldReturnReader) {
171-
await context.ActiveCommand.ExecuteNonQueryAsync(token).ConfigureAwait(false);
190+
await command.ExecuteNonQueryAsync(token).ConfigureAwait(false);
172191
return null;
173192
}
174-
await context.ActiveCommand.ExecuteReaderAsync(token).ConfigureAwait(false);
193+
194+
await command.ExecuteReaderAsync(token).ConfigureAwait(false);
175195
if (hasQueryTasks) {
176-
int currentQueryTask = 0;
196+
var currentQueryTask = 0;
177197
while (currentQueryTask < context.ActiveTasks.Count) {
178198
var queryTask = context.ActiveTasks[currentQueryTask];
179199
var accessor = queryTask.Request.GetAccessor();
180200
var result = queryTask.Output;
181-
var reader = context.ActiveCommand.CreateReader(accessor);
182-
while (reader.MoveNext()) {
183-
result.Add(reader.Current);
201+
while (await command.NextRowAsync(token).ConfigureAwait(false)) {
202+
result.Add(command.ReadTupleWith(accessor));
184203
}
185204

186-
await context.ActiveCommand.NextResultAsync().ConfigureAwait(false);
205+
_ = await command.NextResultAsync(token).ConfigureAwait(false);
187206
currentQueryTask++;
188207
}
189208
}
190-
return shouldReturnReader ? context.ActiveCommand : null;
209+
210+
return shouldReturnReader ? command : null;
191211
}
192212
finally {
193-
if (!shouldReturnReader)
194-
context.ActiveCommand.Dispose();
213+
if (!shouldReturnReader) {
214+
await context.ActiveCommand.DisposeAsync().ConfigureAwait(false);
215+
}
216+
195217
ReleaseCommand(context);
196218
}
197219
}
@@ -203,19 +225,20 @@ private void ProcessUnbatchedTask(SqlPersistTask task, CommandProcessorContext c
203225
ReleaseCommand(context);
204226
AllocateCommand(context);
205227
}
228+
206229
ExecuteUnbatchedTask(task);
207230
}
208231

209232
private void ExecuteUnbatchedTask(SqlPersistTask task)
210233
{
211234
var sequence = Factory.CreatePersistParts(task);
212235
foreach (var part in sequence) {
213-
using (var command = Factory.CreateCommand()) {
214-
command.AddPart(part);
215-
var affectedRowsCount = command.ExecuteNonQuery();
216-
if (affectedRowsCount==0)
217-
throw new VersionConflictException(string.Format(
218-
Strings.ExVersionOfEntityWithKeyXDiffersFromTheExpectedOne, task.EntityKey));
236+
using var command = Factory.CreateCommand();
237+
command.AddPart(part);
238+
var affectedRowsCount = command.ExecuteNonQuery();
239+
if (affectedRowsCount == 0) {
240+
throw new VersionConflictException(string.Format(
241+
Strings.ExVersionOfEntityWithKeyXDiffersFromTheExpectedOne, task.EntityKey));
219242
}
220243
}
221244
}
@@ -225,22 +248,22 @@ private void PutTasksForExecution(CommandProcessorContext context)
225248
if (context.AllowPartialExecution) {
226249
context.ProcessingTasks = new Queue<SqlTask>();
227250
var batchesCount = tasks.Count / batchSize;
228-
if (batchesCount==0)
251+
if (batchesCount == 0) {
229252
return;
253+
}
254+
230255
context.ProcessingTasks = new Queue<SqlTask>();
231-
while (context.ProcessingTasks.Count < batchesCount * batchSize)
256+
while (context.ProcessingTasks.Count < batchesCount * batchSize) {
232257
context.ProcessingTasks.Enqueue(tasks.Dequeue());
258+
}
233259
}
234260
else {
235261
context.ProcessingTasks = new Queue<SqlTask>(tasks);
236262
tasks.Clear();
237263
}
238264
}
239265

240-
private string GetParameterPrefix(CommandProcessorContext context)
241-
{
242-
return string.Format("p{0}_", context.ActiveCommand.Count + 1);
243-
}
266+
private static string GetParameterPrefix(CommandProcessorContext context) => $"p{context.ActiveCommand.Count + 1}_";
244267

245268
#endregion
246269

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/Command.cs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public sealed class Command : IDisposable, IAsyncDisposable
2525
private readonly List<string> statements = new List<string>();
2626

2727
private bool prepared;
28+
private bool isDisposed;
2829
private DisposableSet resources;
2930
private DbDataReader reader;
3031

@@ -42,7 +43,7 @@ public void AddPart(CommandPart part)
4243
underlyingCommand.Parameters.Add(parameter);
4344
}
4445

45-
if (part.Resources.Count==0) {
46+
if (part.Resources.Count == 0) {
4647
return;
4748
}
4849

@@ -124,7 +125,7 @@ public DataReader CreateReader(DbDataReaderAccessor accessor, CancellationToken
124125

125126
public DbCommand Prepare()
126127
{
127-
if (statements.Count==0) {
128+
if (statements.Count == 0) {
128129
throw new InvalidOperationException("Unable to prepare command: no parts registered");
129130
}
130131

@@ -142,16 +143,22 @@ private StorageException TranslateException(Exception exception) =>
142143

143144
public void Dispose()
144145
{
145-
reader.DisposeSafely();
146-
resources.DisposeSafely();
147-
underlyingCommand.DisposeSafely();
146+
if (!isDisposed) {
147+
isDisposed = true;
148+
reader.DisposeSafely();
149+
resources.DisposeSafely();
150+
underlyingCommand.DisposeSafely();
151+
}
148152
}
149153

150154
public async ValueTask DisposeAsync()
151155
{
152-
await reader.DisposeSafelyAsync().ConfigureAwait(false);
153-
await resources.DisposeSafelyAsync().ConfigureAwait(false);
154-
await underlyingCommand.DisposeSafelyAsync().ConfigureAwait(false);
156+
if (!isDisposed) {
157+
isDisposed = true;
158+
await reader.DisposeSafelyAsync().ConfigureAwait(false);
159+
await resources.DisposeSafelyAsync().ConfigureAwait(false);
160+
await underlyingCommand.DisposeSafelyAsync().ConfigureAwait(false);
161+
}
155162
}
156163

157164
// Constructors

Orm/Xtensive.Orm/Orm/Providers/CommandProcessing/DataReader.cs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,36 @@ namespace Xtensive.Orm.Providers
3737
object IEnumerator.Current => Current;
3838

3939
/// <inheritdoc/>
40-
public bool MoveNext() => source is Command command
41-
? command.NextRow()
42-
: ((IEnumerator<Tuple>) source).MoveNext();
40+
public bool MoveNext()
41+
{
42+
if (!(source is Command command)) {
43+
return ((IEnumerator<Tuple>) source).MoveNext();
44+
}
45+
46+
if (command.NextRow()) {
47+
return true;
48+
}
49+
50+
// We don't need the command anymore because all records are processed to the moment.
51+
command.Dispose();
52+
return false;
53+
}
4354

4455
/// <inheritdoc/>
45-
public async ValueTask<bool> MoveNextAsync() => source is Command command
46-
? await command.NextRowAsync(token).ConfigureAwait(false)
47-
: ((IEnumerator<Tuple>) source).MoveNext();
56+
public async ValueTask<bool> MoveNextAsync()
57+
{
58+
if (!(source is Command command)) {
59+
return ((IEnumerator<Tuple>) source).MoveNext();
60+
}
61+
62+
if (await command.NextRowAsync(token).ConfigureAwait(false)) {
63+
return true;
64+
}
65+
66+
// We don't need the command anymore because all records are processed to the moment.
67+
await command.DisposeAsync().ConfigureAwait(false);
68+
return false;
69+
}
4870

4971
/// <inheritdoc/>
5072
public void Reset()
@@ -101,4 +123,4 @@ public DataReader(Command command, DbDataReaderAccessor accessor, CancellationTo
101123
this.token = token;
102124
}
103125
}
104-
}
126+
}

0 commit comments

Comments
 (0)