@@ -43,19 +43,19 @@ protected virtual void SetClientCertificates(HttpWebRequest request, RequestData
4343 protected virtual void SetServerCertificateValidationCallBackIfNeeded ( HttpWebRequest request , RequestData requestData )
4444 {
4545 var callback = requestData ? . ConnectionSettings ? . ServerCertificateValidationCallback ;
46- #if ! __MonoCS__
46+ #if ! __MonoCS__
4747 //Only assign if one is defined on connection settings and a subclass has not already set one
4848 if ( callback != null && request . ServerCertificateValidationCallback == null )
4949 request . ServerCertificateValidationCallback = new RemoteCertificateValidationCallback ( callback ) ;
50- #else
50+ #else
5151 if ( callback != null )
5252 throw new Exception ( "Mono misses ServerCertificateValidationCallback on HttpWebRequest" ) ;
5353 #endif
5454 }
5555
5656 protected virtual HttpWebRequest CreateWebRequest ( RequestData requestData )
5757 {
58- var request = ( HttpWebRequest ) WebRequest . Create ( requestData . Uri ) ;
58+ var request = ( HttpWebRequest ) WebRequest . Create ( requestData . Uri ) ;
5959
6060 request . Accept = requestData . Accept ;
6161 request . ContentType = requestData . ContentType ;
@@ -76,7 +76,7 @@ protected virtual HttpWebRequest CreateWebRequest(RequestData requestData)
7676 if ( requestData . Headers != null && requestData . Headers . HasKeys ( ) )
7777 request . Headers . Add ( requestData . Headers ) ;
7878
79- var timeout = ( int ) requestData . RequestTimeout . TotalMilliseconds ;
79+ var timeout = ( int ) requestData . RequestTimeout . TotalMilliseconds ;
8080 request . Timeout = timeout ;
8181 request . ReadWriteTimeout = timeout ;
8282
@@ -101,7 +101,6 @@ protected virtual void AlterServicePoint(ServicePoint requestServicePoint, Reque
101101 //this method only sets internal values and wont actually cause timers and such to be reset
102102 //So it should be idempotent if called with the same parameters
103103 requestServicePoint . SetTcpKeepAlive ( true , requestData . KeepAliveTime , requestData . KeepAliveInterval ) ;
104-
105104 }
106105
107106 protected virtual void SetProxyIfNeeded ( HttpWebRequest request , RequestData requestData )
@@ -161,8 +160,8 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
161160 //Either the stream or the response object needs to be closed but not both although it won't
162161 //throw any errors if both are closed atleast one of them has to be Closed.
163162 //Since we expose the stream we let closing the stream determining when to close the connection
164- var response = ( HttpWebResponse ) request . GetResponse ( ) ;
165- builder . StatusCode = ( int ) response . StatusCode ;
163+ var response = ( HttpWebResponse ) request . GetResponse ( ) ;
164+ builder . StatusCode = ( int ) response . StatusCode ;
166165 builder . Stream = response . GetResponseStream ( ) ;
167166
168167 if ( response . SupportsHeaders && response . Headers . HasKeys ( ) && response . Headers . AllKeys . Contains ( "Warning" ) )
@@ -180,8 +179,17 @@ public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData reque
180179 }
181180
182181
183- private static RegisteredWaitHandle RegisterApmTaskTimeout ( IAsyncResult result , WebRequest request , RequestData requestData ) =>
184- ThreadPool . RegisterWaitForSingleObject ( result . AsyncWaitHandle , TimeoutCallback , request , requestData . RequestTimeout , true ) ;
182+ /// <summary>
183+ /// Registers an APM async task cancellation on the threadpool
184+ /// </summary>
185+ /// <returns>An unregister action that can be used to remove the waithandle prematurely</returns>
186+ private static Action RegisterApmTaskTimeout ( IAsyncResult result , WebRequest request , RequestData requestData )
187+ {
188+ var waitHandle = result . AsyncWaitHandle ;
189+ var registeredWaitHandle =
190+ ThreadPool . RegisterWaitForSingleObject ( waitHandle , TimeoutCallback , request , requestData . RequestTimeout , true ) ;
191+ return ( ) => registeredWaitHandle ? . Unregister ( waitHandle ) ;
192+ }
185193
186194 private static void TimeoutCallback ( object state , bool timedOut )
187195 {
@@ -193,74 +201,65 @@ public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(
193201 CancellationToken cancellationToken ) where TReturn : class
194202 {
195203 var builder = new ResponseBuilder < TReturn > ( requestData , cancellationToken ) ;
196- WaitHandle apmWaitHandle = null ;
197- RegisteredWaitHandle apmTaskTimeout = null ;
204+ Action unregisterWaitHandle = null ;
198205 try
199206 {
200207 var data = requestData . PostData ;
201208 var request = this . CreateHttpWebRequest ( requestData ) ;
202209 using ( cancellationToken . Register ( ( ) => request . Abort ( ) ) )
203210 {
204211 if ( data != null )
205- await PostRequestAsync ( requestData , cancellationToken , request , data ) ;
212+ {
213+ var apmGetRequestStreamTask = Task . Factory . FromAsync ( request . BeginGetRequestStream , request . EndGetRequestStream , null ) ;
214+ unregisterWaitHandle = RegisterApmTaskTimeout ( apmGetRequestStreamTask , request , requestData ) ;
215+
216+ using ( var stream = await apmGetRequestStreamTask . ConfigureAwait ( false ) )
217+ {
218+ if ( requestData . HttpCompression )
219+ using ( var zipStream = new GZipStream ( stream , CompressionMode . Compress ) )
220+ await data . WriteAsync ( zipStream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
221+ else
222+ await data . WriteAsync ( stream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
223+ }
224+ unregisterWaitHandle ? . Invoke ( ) ;
225+ }
206226 requestData . MadeItToResponse = true ;
207- //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
208- //Either the stream or the response object needs to be closed but not both although it won't
209- //throw any errors if both are closed atleast one of them has to be Closed.
210- //Since we expose the stream we let closing the stream determining when to close the connection
211-
212- var apmGetResponseTask = Task . Factory . FromAsync ( request . BeginGetResponse , request . EndGetResponse , null ) ;
213- apmWaitHandle = ( ( IAsyncResult ) apmGetResponseTask ) . AsyncWaitHandle ;
214- apmTaskTimeout = RegisterApmTaskTimeout ( apmGetResponseTask , request , requestData ) ;
215-
216- var response = ( HttpWebResponse ) ( await apmGetResponseTask . ConfigureAwait ( false ) ) ;
217- builder . StatusCode = ( int ) response . StatusCode ;
218- builder . Stream = response . GetResponseStream ( ) ;
219- if ( response . SupportsHeaders && response . Headers . HasKeys ( ) && response . Headers . AllKeys . Contains ( "Warning" ) )
220- builder . DeprecationWarnings = response . Headers . GetValues ( "Warning" ) ;
221- // https://github.com/elastic/elasticsearch-net/issues/2311
222- // if stream is null call dispose on response instead.
223- if ( builder . Stream == null || builder . Stream == Stream . Null ) response . Dispose ( ) ;
224- if ( apmWaitHandle != null ) apmTaskTimeout ? . Unregister ( apmWaitHandle ) ;
227+ //http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
228+ //Either the stream or the response object needs to be closed but not both although it won't
229+ //throw any errors if both are closed atleast one of them has to be Closed.
230+ //Since we expose the stream we let closing the stream determining when to close the connection
231+
232+ var apmGetResponseTask = Task . Factory . FromAsync ( request . BeginGetResponse , request . EndGetResponse , null ) ;
233+ unregisterWaitHandle = RegisterApmTaskTimeout ( apmGetResponseTask , request , requestData ) ;
234+
235+ var response = ( HttpWebResponse ) ( await apmGetResponseTask . ConfigureAwait ( false ) ) ;
236+ builder . StatusCode = ( int ) response . StatusCode ;
237+ builder . Stream = response . GetResponseStream ( ) ;
238+ if ( response . SupportsHeaders && response . Headers . HasKeys ( ) && response . Headers . AllKeys . Contains ( "Warning" ) )
239+ builder . DeprecationWarnings = response . Headers . GetValues ( "Warning" ) ;
240+ // https://github.com/elastic/elasticsearch-net/issues/2311
241+ // if stream is null call dispose on response instead.
242+ if ( builder . Stream == null || builder . Stream == Stream . Null ) response . Dispose ( ) ;
225243 }
226244 }
227245 catch ( WebException e )
228246 {
229- if ( apmWaitHandle != null ) apmTaskTimeout ? . Unregister ( apmWaitHandle ) ;
230247 HandleException ( builder , e ) ;
231248 }
232- catch
249+ finally
233250 {
234- if ( apmWaitHandle != null ) apmTaskTimeout ? . Unregister ( apmWaitHandle ) ;
235- throw ;
251+ unregisterWaitHandle ? . Invoke ( ) ;
236252 }
237253 return await builder . ToResponseAsync ( ) . ConfigureAwait ( false ) ;
238254 }
239255
240- private static async Task PostRequestAsync ( RequestData requestData , CancellationToken cancellationToken , HttpWebRequest request ,
241- PostData < object > data )
242- {
243- var apmGetRequestStreamTask = Task . Factory . FromAsync ( request . BeginGetRequestStream , request . EndGetRequestStream , null ) ;
244- var getRequestStreamCancellationHandle = RegisterApmTaskTimeout ( apmGetRequestStreamTask , request , requestData ) ;
245-
246- using ( var stream = await apmGetRequestStreamTask . ConfigureAwait ( false ) )
247- {
248- if ( requestData . HttpCompression )
249- using ( var zipStream = new GZipStream ( stream , CompressionMode . Compress ) )
250- await data . WriteAsync ( zipStream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
251- else
252- await data . WriteAsync ( stream , requestData . ConnectionSettings , cancellationToken ) . ConfigureAwait ( false ) ;
253- }
254- getRequestStreamCancellationHandle . Unregister ( ( ( IAsyncResult ) apmGetRequestStreamTask ) . AsyncWaitHandle ) ;
255- }
256-
257- private void HandleException < TReturn > ( ResponseBuilder < TReturn > builder , WebException exception )
256+ private static void HandleException < TReturn > ( ResponseBuilder < TReturn > builder , WebException exception )
258257 where TReturn : class
259258 {
260259 builder . Exception = exception ;
261260 var response = exception . Response as HttpWebResponse ;
262261 if ( response == null ) return ;
263- builder . StatusCode = ( int ) response . StatusCode ;
262+ builder . StatusCode = ( int ) response . StatusCode ;
264263 builder . Stream = response . GetResponseStream ( ) ;
265264 // https://github.com/elastic/elasticsearch-net/issues/2311
266265 // if stream is null call dispose on response instead.
@@ -269,7 +268,9 @@ private void HandleException<TReturn>(ResponseBuilder<TReturn> builder, WebExcep
269268
270269 void IDisposable . Dispose ( ) => this . DisposeManagedResources ( ) ;
271270
272- protected virtual void DisposeManagedResources ( ) { }
271+ protected virtual void DisposeManagedResources ( )
272+ {
273+ }
273274 }
274275}
275276#endif
0 commit comments