Rx Challenge #6 Solution

2015/04/19

no comments

the previous challenge was about weakening the reference to the subscriber.

you can see the challenge here.

my solution for this challenge involve WeakObserver class + extension method

which will create mediator of the weak observer each subscription.

the following code snippet describe my solution:

public class WeakObserver<T> : IObserver<T>
{
private readonly WeakReference<IObserver<T>> _target;

#region Ctor

/// <summary>
/// Initializes a new instance of the <see cref="WeakObserver{T}"/> class.
/// </summary>
/// <param name="target">The target.</param>
/// <exception cref="System.ArgumentNullException">target</exception>
public WeakObserver(IObserver<T> target)
{
#region Validation

if (target == null)
throw new ArgumentNullException("target");

#endregion Validation

_target = new WeakReference<IObserver<T>>(target);
}

#endregion Ctor

#region Target

/// <summary>
/// Gets the target.
/// </summary>
/// <value>
/// The target.
/// </value>
private IObserver<T> Target
{
get
{
IObserver<T> target;
if (_target.TryGetTarget(out target))
return target;
return null;
}
}

#endregion Target

#region IObserver<T> Members

#region OnCompleted

/// <summary>
/// Notifies the observer that the provider has finished sending push-based notifications.
/// </summary>
public void OnCompleted()
{
IObserver<T> target = Target;
if (target == null)
return;

target.OnCompleted();
}

#endregion OnCompleted

#region OnError

/// <summary>
/// Notifies the observer that the provider has experienced an error condition.
/// </summary>
/// <param name="error">An object that provides additional information about the error.</param>
public void OnError(Exception error)
{
IObserver<T> target = Target;
if (target == null)
return;

target.OnError(error);
}

#endregion OnError

#region OnNext

/// <summary>
/// Provides the observer with new data.
/// </summary>
/// <param name="value">The current notification information.</param>
public void OnNext(T value)
{
IObserver<T> target = Target;
if (target == null)
return;

target.OnNext(value);
}

#endregion OnNext

#endregion IObserver<T> Members
}

public class WeakObservable<T> : IObservable<T>
{
private readonly IObservable<T> _source;

#region Ctor

/// <summary>
/// Initializes a new instance of the <see cref="WeakObservable{T}"/> class.
/// </summary>
/// <param name="source">The source.</param>
/// <exception cref="System.ArgumentNullException">source</exception>
public WeakObservable(IObservable<T> source)
{
#region Validation

if (source == null)
throw new ArgumentNullException("source");

#endregion Validation

_source = source;
}

#endregion Ctor

#region Subscribe

/// <summary>
/// Subscribes the specified observer.
/// </summary>
/// <param name="observer">The observer.</param>
/// <returns></returns>
public IDisposable Subscribe(IObserver<T> observer)
{
IObservable<T> source = _source;
if (source == null)
return Disposable.Empty;
var weakObserver = new WeakObserver<T>(observer);
IDisposable disp = source.Subscribe(weakObserver);
return disp;
}

#endregion Subscribe
}

public static class WeakObservableExtensions
{
#region AsWeakObservable

/// <summary>
/// To weak observable.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="source">The source.</param>
/// <returns></returns>
public static IObservable<T> AsWeakObservable<T>(this IObservable<T> source)
{
return new WeakObservable<T>(source);
}

#endregion AsWeakObservable
}

you can check it with the following Unit Test:

[TestMethod]
public void WeakSubscription_Test()
{
// re-process a value when the stream ahead produce error

var scheduler = new TestScheduler();
var queue = new ConcurrentQueue<long>();

var source = Observable.Interval(TimeSpan.FromSeconds(1), scheduler);

var xs = source.AsWeakObservable().Subscribe(m => queue.Enqueue(m));

scheduler.AdvanceBy(TimeSpan.FromSeconds(10.5).Ticks);
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
scheduler.AdvanceBy(TimeSpan.FromSeconds(10.5).Ticks);

Assert.AreEqual(10, queue.Count);
GC.KeepAlive(source);
}

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*