Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pkgs/sdk/server/src/Interfaces/DataSourceStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public struct ErrorInfo
/// </summary>
public ErrorKind Kind { get; set; }

/// <summary>
/// Whether the error is recoverable. Recoverable errors are those that can be retried, such as network errors. Unrecoverable
/// errors are those that cannot be retried, such as invalid SDK key errors.
/// </summary>
public bool Recoverable { get; set; }

/// <summary>
/// The HTTP status code if the error was <see cref="ErrorKind.ErrorResponse"/>, or zero otherwise.
/// </summary>
Expand Down Expand Up @@ -90,24 +96,28 @@ public struct ErrorInfo
/// Constructs an instance based on an exception.
/// </summary>
/// <param name="e">the exception</param>
/// <param name="recoverable">whether the error is recoverable</param>
/// <returns>an ErrorInfo</returns>
public static ErrorInfo FromException(Exception e) => new ErrorInfo
public static ErrorInfo FromException(Exception e, bool recoverable) => new ErrorInfo
{
Kind = e is IOException ? ErrorKind.NetworkError : ErrorKind.Unknown,
Message = e.Message,
Time = DateTime.Now
Time = DateTime.Now,
Recoverable = recoverable
};

/// <summary>
/// Constructs an instance based on an HTTP error status.
/// </summary>
/// <param name="statusCode">the status code</param>
/// <param name="recoverable">whether the error is recoverable</param>
/// <returns>an ErrorInfo</returns>
public static ErrorInfo FromHttpError(int statusCode) => new ErrorInfo
public static ErrorInfo FromHttpError(int statusCode, bool recoverable) => new ErrorInfo
{
Kind = ErrorKind.ErrorResponse,
StatusCode = statusCode,
Time = DateTime.Now
Time = DateTime.Now,
Recoverable = recoverable
};

/// <inheritdoc/>
Expand Down
53 changes: 37 additions & 16 deletions pkgs/sdk/server/src/Internal/DataSources/PollingDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ internal sealed class PollingDataSource : IDataSource
private readonly Logger _log;
private CancellationTokenSource _canceller;

private bool _disposed = false;
private readonly AtomicBoolean _shuttingDown = new AtomicBoolean(false);

internal PollingDataSource(
LdClientContext context,
IFeatureRequestor featureRequestor,
Expand Down Expand Up @@ -81,62 +84,80 @@ private async Task UpdateTaskAsync()
}
catch (UnsuccessfulResponseException ex)
{
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode);
var recoverable = HttpErrors.IsRecoverable(ex.StatusCode);
var errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(ex.StatusCode, recoverable);

if (HttpErrors.IsRecoverable(ex.StatusCode))
if (errorInfo.Recoverable)
{
_log.Warn(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", "will retry"));
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
}
else
{
_log.Error(HttpErrors.ErrorMessage(ex.StatusCode, "polling request", ""));
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
try
{
// if client is initializing, make it stop waiting
_initTask.SetResult(true);
_initTask.SetResult(false);
Copy link
Contributor Author

@tanderson-ld tanderson-ld Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Twas the night before bugmas

}
catch (InvalidOperationException)
{
// the task was already set - nothing more to do
}
((IDisposable)this).Dispose();
Shutdown(errorInfo);
}
}
catch (JsonException ex)
{
_log.Error("Polling request received malformed data: {0}", LogValues.ExceptionSummary(ex));
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
new DataSourceStatus.ErrorInfo
{
Kind = DataSourceStatus.ErrorKind.InvalidData,
Time = DateTime.Now
});
var errorInfo = new DataSourceStatus.ErrorInfo
{
Kind = DataSourceStatus.ErrorKind.InvalidData,
Message = ex.Message,
Time = DateTime.Now,
Recoverable = true
};
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
}
catch (Exception ex)
{
Exception realEx = (ex is AggregateException ae) ? ae.Flatten() : ex;
_log.Warn("Polling for feature flag updates failed: {0}", LogValues.ExceptionSummary(ex));
_log.Debug(LogValues.ExceptionTrace(ex));
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted,
DataSourceStatus.ErrorInfo.FromException(realEx));
var errorInfo = DataSourceStatus.ErrorInfo.FromException(realEx, true); // default to recoverable
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
}
}

void IDisposable.Dispose()
{
// dispose is currently overloaded with shutdown responsibility, we handle this first
Shutdown(null);

Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (disposing)
{
_canceller?.Cancel();
if (_disposed) return;

if (disposing) {
// dispose managed resources if any
_featureRequestor.Dispose();
}

_disposed = true;
}

private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo)
{
// Prevent concurrent shutdown calls - only allow the first call to proceed
// GetAndSet returns the OLD value, so if it was already true, we return early
if (_shuttingDown.GetAndSet(true)) return;

_canceller?.Cancel();
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
}

private bool InitWithHeaders(DataStoreTypes.FullDataSet<DataStoreTypes.ItemDescriptor> allData,
Expand Down
58 changes: 40 additions & 18 deletions pkgs/sdk/server/src/Internal/DataSources/StreamingDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ internal class StreamingDataSource : IDataSource

private IEnumerable<KeyValuePair<string, IEnumerable<string>>> _headers;

private bool _disposed = false;
private readonly AtomicBoolean _shuttingDown = new AtomicBoolean(false);

internal delegate IEventSource EventSourceCreator(Uri streamUri,
HttpConfiguration httpConfig);

Expand Down Expand Up @@ -104,20 +107,36 @@ public Task<bool> Start()

public void Dispose()
{
// dispose is currently overloaded with shutdown responsibility, we handle this first
Shutdown(null);

Dispose(true);
GC.SuppressFinalize(this);
}

private void Dispose(bool disposing)
{
if (disposing)
if (_disposed) return;

if (disposing) {
// dispose managed resources if any
}

_disposed = true;
}

private void Shutdown(DataSourceStatus.ErrorInfo? errorInfo)
{
// Prevent concurrent shutdown calls - only allow the first call to proceed
// GetAndSet returns the OLD value, so if it was already true, we return early
if (_shuttingDown.GetAndSet(true)) return;

_es.Close();
if (_storeStatusMonitoringEnabled)
{
_es.Close();
if (_storeStatusMonitoringEnabled)
{
_dataSourceUpdates.DataStoreStatusProvider.StatusChanged -= OnDataStoreStatusChanged;
}
_dataSourceUpdates.DataStoreStatusProvider.StatusChanged -= OnDataStoreStatusChanged;
}
_dataSourceUpdates.UpdateStatus(DataSourceState.Off, errorInfo);
}

#endregion
Expand Down Expand Up @@ -175,7 +194,8 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
{
Kind = DataSourceStatus.ErrorKind.InvalidData,
Message = ex.Message,
Time = DateTime.Now
Time = DateTime.Now,
Recoverable = true
};
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);

Expand All @@ -187,7 +207,8 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
{
Kind = DataSourceStatus.ErrorKind.StoreError,
Message = (ex.InnerException ?? ex).Message,
Time = DateTime.Now
Time = DateTime.Now,
Recoverable = true
};
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
if (!_storeStatusMonitoringEnabled)
Expand All @@ -210,17 +231,16 @@ private void OnMessage(object sender, EventSource.MessageReceivedEventArgs e)
private void OnError(object sender, EventSource.ExceptionEventArgs e)
{
var ex = e.Exception;
var recoverable = true;
DataSourceStatus.ErrorInfo errorInfo;

if (ex is EventSourceServiceUnsuccessfulResponseException respEx)
{
int status = respEx.StatusCode;
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status);
var recoverable = HttpErrors.IsRecoverable(status);
errorInfo = DataSourceStatus.ErrorInfo.FromHttpError(status, recoverable);
RecordStreamInit(true);
if (!HttpErrors.IsRecoverable(status))
if (!recoverable)
{
recoverable = false;
_log.Error(HttpErrors.ErrorMessage(status, "streaming connection", ""));
}
else
Expand All @@ -230,21 +250,23 @@ private void OnError(object sender, EventSource.ExceptionEventArgs e)
}
else
{
errorInfo = DataSourceStatus.ErrorInfo.FromException(ex);
errorInfo = DataSourceStatus.ErrorInfo.FromException(ex, true); // default to recoverable
_log.Warn("Encountered EventSource error: {0}", LogValues.ExceptionSummary(ex));
_log.Debug(LogValues.ExceptionTrace(ex));
}

_dataSourceUpdates.UpdateStatus(recoverable ? DataSourceState.Interrupted : DataSourceState.Off,
errorInfo);

if (!recoverable)
if (errorInfo.Recoverable)
{
_dataSourceUpdates.UpdateStatus(DataSourceState.Interrupted, errorInfo);
return;
}
else
{
// Make _initTask complete to tell the client to stop waiting for initialization. We use
// TrySetResult rather than SetResult here because it might have already been completed
// (if for instance the stream started successfully, then restarted and got a 401).
_initTask.TrySetResult(false);
((IDisposable)this).Dispose();
Shutdown(errorInfo);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ public void UpdateStatus(DataSourceState newState, DataSourceStatus.ErrorInfo? n
// When a synchronizer reports it is off, fall back immediately
if (newState == DataSourceState.Off)
{
_actionable.BlacklistCurrent();
if (newError != null && !newError.Value.Recoverable)
{
_actionable.BlacklistCurrent();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now if a datasource decides to shutdown for a recoverable reason (who knows what that will be in the future), we don't blacklist too agressively.

_actionable.DisposeCurrent();
_actionable.GoToNext();
_actionable.StartCurrent();
Expand Down
Loading
Loading