Recommendation
this is a post recommendation about using Rx framework with WPF events,
I'm recommending this post from bobby's blog, the post is demonstrating the
use of creating observable stream from WPF event.
it is a short post with nice little code sample.
bobby did a real good job by wrapping the observable event (Observable.FromEvent)
into more human readable format (using T4 template).
Rx - for beginners (part 10): Concat expression

this post is the 10th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Concat expression.
the code for this post can be download from here.
What does it do?
the Concat expression is used to concatenate one Observable stream
into the end of another Observable stream.
once the first Observable stream is completed the the concat stream will be immediate subscribed.
the marble diagrams of Concat expression will look as follow:
the result observable stream will subscribe to the yStream after the xStream is completed,
therefore any values on the yStream before the subscription will be ignored.
in case of exception on any of the Concat Observable streams the marble diagrams will look as follow:
Code sample
the code for this post can be download from here.
Helper method for creating interval stream (based on System.Timers.Timer event):
Code Snippet
- static System.Timers.Timer s_timer = new System.Timers.Timer(1000);
-
- private static IObservable<string> CreateObservable(string prefix,TimeSpan stopAt)
- {
- var eventStream = Observable.FromEvent<ElapsedEventArgs>(s_timer,"Elapsed");
-
- var startTime = DateTime.Now;
- var stopTime = startTime + stopAt;
-
- Func<IEvent<ElapsedEventArgs>,bool> stopPredicate =
- value => value.EventArgs.SignalTime < stopTime;
- Func<IEvent<ElapsedEventArgs>,string> selectFormatter =
- value => prefix + (value.EventArgs.SignalTime - startTime).TotalSeconds.ToString("N0");
-
- var obs = eventStream.TakeWhile(stopPredicate).Select(selectFormatter);
-
- return obs;
- }
Line 1, create instance of the timer.
Line 3, the method get 2 parameters:
- prefix: which is used to format the stream output (line 13).
- stop at: which will define the stream lifetime duration.
line 5, create observable stream out of the timer elapsed event (we can use this technique on any .NET event).
lines 10-11, is the stop predicate which is used by the TakeWhile at line 15.
lines 12-13, is the output formatting which is used by the Select at line 15.
the main method is look as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X", TimeSpan.FromSeconds(3.5));
- var ys = CreateObservable(" Y", TimeSpan.FromSeconds(10));
-
- xs.Subscribe (value => {/* do nothing */},
- () =>
- {
- Console.ForegroundColor = ConsoleColor.Gray;
- Console.WriteLine("xs Completed");
- Console.ForegroundColor = ConsoleColor.White;
- });
-
- var combineLatestStream = xs.Concat(ys);
-
- Console.ForegroundColor = ConsoleColor.White;
- combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
- s_timer.Start();
- Console.ReadKey();
- }
line 3, create observable stream with lifetime period of 3.5 seconds.
lime 4, create observable stream with lifetime period of 10 seconds.
lines 6-12, subscribing the the complete event of the xs stream.
line 14, Concat the streams.
line 17, subscribe to to concat output stream.
line 18, start the timer so the streams will start to produce values.
the output will look as follow:
you may notice that Y4 value is missing.
the reason for it is that the complete event occur at the 4th timer elapse event,
and only then the subscription to the ys stream was taking place so nobody was yet listening
to the ys stream when the 4th timer elapsed.
Summary
Concat can join 2 observable stream, but unlike Merge
it doesn't listen on both streams simultaneously.
תגים של Technorati:
Rx,
IObservable,
IObserver
New release of Rx is available for download.
the Rx team has new release for .NET 3.5, .NET 4.0 RC, and Silverlight 3.
you can download the library from here.
the release includes some API changes,
some APIs has moved into different assemblies,
performance and bug fixes.
Highlights
- Moved IObservable & IObserver to seperate assembly: System.Observable.dll.
- Moved Schedulers to System.CoreEx.dll.
- Moved schedulers to System.Concurrency.
- Added schedulers to subjects.
- Removed deadlocks throughout Rx.
- Renamed Hide to AsObservable, add AsObserver, AsEnumerator.
- Added ObserveOn/SubscribeOn on IScheduler.
Release Notes
Build 1.0.2350.0 03/15/2010
- Added System.Observable to redist.txt.
- Changed Timeout to work on multiple notifications.
- Renamed HoldUntilChanged to DistinctUntilChanged.
- Made Semaphore for Silverlight internal.
- Adding Zip overload for IO<T> and IE<T> zipping.
- Changed last Throttle message to come out at Completion time and fix concurrency bug in Throttle.
- Changed OnErrorResumeNext to always complete.
- Changed Delay error semantics to be abort.
- Added ObserveOn/SubscribeOn on IScheduler.
- Fixed finally behavior.
- Minimized locks in subjects.
- Added MinBy, MaxBy operators as well as Min, Max with comparers.
- Added DistinctBy for Enumerable.
Build 1.0.2317.0 03/05/2010
- Moved to RC for .NET 4 build.
- Moved IObservable & IObserver to seperate assembly: System.Observable.dll.
- Removed IsCritical and its usage on exceptions thrown inside Rx operators.
- Fixed bugs in Enumerable->Observable conversions.
- Fixed threading bugs in EnumerableEx Merge and Amb operator.
- Changed signature of Buffer to expose an IList<T> as opposed to an IEnumerable<T>.
- Fixed Buffer behavior, with regards to flushing last frame and non-overlapping windows (count < skip).
- Renamed Buffer to BufferWithTime and BufferWithCount.
- Introduced BufferWithCount for IEnumerable.
- Renamed Hide to AsObservable, add AsObserver, AsEnumerator.
- Moved Schedulers & Disposables to System.CoreEx.
- Added schedulers to subjects.
- Enforced abort semantics for all exceptions in Rx.
- Added OfType and Cast to Observable, plus additional Do methods + parity in EnumerableEx.
- Reworked GroupBy to be lazy & performant.
- Removed deadlocks throughout Rx.
- Fixed reentrancy bug in Scheduler.
- Added RefCountDisposable.
- Removed LockDisposable.
- Added AsyncLock.
- Changed TimeSpan.MaxValue to not be a sentinel value for Infinite.
- Changed IScheduler argument positioning to be last where possible.
- Moved schedulers to System.Concurrency.
- Renamed Scheduler.Now to Scheduler.CurrentThread.
- Renamed Scheduler.Later to Scheduler.ThreadPool.
- Removed Scheduler.Default.
- Added Scheduler.Immediate and ImmediateScheduler.
- Added Scheduler.TaskPool and TaskPoolScheduler where Tasks are available.
- Added Scheduler.NewThread and NewThreadScheduler.
- Added SynchronizationContextScheduler.
- Added IConnectableObservable.
- Added ConnectableObservable.
- Removed EventSubject.
- Changed Publish/Replay/Prune to return IConnectableObservables.
- Added RefCount operator.
- Made all subjects not disposable.
- Added schedulers to asynchronous operations.
- Made Rx assemblies CLSCompliant.
- Added aggregate operators.
- Removed properties from Rx.
- Removed timedrift from time based operators.
- Changed default schedulers for operators.
- Cleaned up Generate and GenerateWithTime overloads.
- Renamed GroupDisposable to CompositeDisposable.
- Moved reference required overloads of extension methods to defining namespace.
- Many other small bug fixes.
- Removed BooleanSignal.
- Fixed timing bug in ControlScheduler.
תגים של Technorati:
IObservable,
IObserver,
Rx
Rx - for beginners (part 9): Hot Vs. Cold observable
this post is the 9th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Hot Vs. Cold observable stream.
the code for this post can be download from here.
If a tree falls in a forest and no one is around to hear it,
does it make a sound?
if it do make a sound when nobody observed it, we should mark it as hot,
otherwise it should be marked as cold.
What is Hot observable?
hot observable streams are streams that is active regardless whether or not
they are being observed.
such stream are the mouse move event or timer event.
What is Cold observable?
cold observable streams are streams that is activated upon subscription
(in most case new stream will be activate for each subscription).
the Observable.Interval is a sample for cold observable stream.
Code Sample
the code for this post can be download from here.
the following sample demonstrate the difference between hot and cold observable streams:
Code Snippet
- var timer = new System.Timers.Timer(1000);
- // create Cold observable stream (this stream will not project values
- // until the subscription, and will project new stream for each subscription)
- var coldStream = Observable.Interval(TimeSpan.FromSeconds(1));
-
- // create Hot observable stream out of .NET event
- var hotStream = Observable.FromEvent<ElapsedEventArgs>(timer,"Elapsed");
- DateTime startTime = DateTime.Now;
- timer.Start(); // start the timer (the timer will start producing values
- // regardless of the subscription)
- // convert the hot stream to stream that project the amount of seconds from the timer start
- var hotStreamInterval = hotStream.Select ( value => (value.EventArgs.SignalTime - startTime).TotalSeconds.ToString("N0"));
-
- Thread.Sleep(TimeSpan.FromSeconds(3)); // sleep for 3 seconds befor the subscription
- using (hotStreamInterval.Subscribe (value => Write("Hot: " + value, ConsoleColor.Green)))
- using(coldStream.Subscribe(value => Write("\t\tCold 1: " + value,ConsoleColor.Yellow)))
- {
- Thread.Sleep(TimeSpan.FromSeconds(2)); // sleep for 2 seconds befor the second subscription
- // second subscription
- using(coldStream.Subscribe(value => Write("\t\t\t\tCold 2: " + value,ConsoleColor.DarkYellow)))
- {
- Thread.Sleep(TimeSpan.FromSeconds(4)); // wait 4 second before disposal
- } // dispose hotStreamInterval and coldStream 2
- } // dispose hotStreamInterval and coldStream 1
line 1, create system timer with 1 second interval.
line 4, define cold observable stream with interval of 1 second.
line 7, creating hot observable stream out of the timer elapsed event (we can use this technique on any .NET event).
line 9, start the timer (because this is hot stream the timer will not wait for the subscription).
line 12, format the timer stream into more friendly observable stream that will project the timer duration in seconds.
line 14, sleeping 3 second before the subscription (this will make the difference between the hot streams that will
increase the value even though no body is listening and the cold stream that doesn't construct until the subscription).
line 15, subscribing to the hot observable stream (the using statement is the subscription scope).
line 16, first subscription to the cold observable stream.
line 18, sleeping before the second subscription to the cold stream.
line 20, second subscription to the cold observable stream (the second subscription will construct new observable stream).
line 22, waiting 4 second before leaving the subscription scope.
the output will look as follow:

- the hot timer observable stream (green) is starting with value = 3 because it is hot stream
which was producing value during the 3 second sleeping period - the first cold observable stream (yellow) is starting with value = 0 because the stream were constructed after
the sleep period. - the second cold observable stream (dark yellow) is starting with value = 0 because each subscription
is constructing new stream.
Summary

subscribing to hot observable stream will produce different result
from cold observable stream subscription, and it is important to
be aware for the nature of the stream you subscribing to.
תגים של Technorati:
Rx,
IObservable,
IObserver
Rx - for beginners (part 8): Combine Latest expression

this post is the 8th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Combine Latest expression.
the code for this post can be download from here.
What does it do?
like the zip expression the Combine Latest expression is used to synchronize
2 IObservable streams into single IObservable stream.
unlike the zip expression the Combine Latest doesn't using queue,
as it name suggest it only remember the latest value of each stream.
What is the combination strategy?
the strategy used by the Combine Latest is to observe both stream, and each time
it new value observed on either of the stream it combine it with the latest value observed on the other stream.
the marble diagrams of Combine Latest expression will look as follow:
we can see that when the y stream observed the b value, the result stream
combine it with the latest value observed on x stream (3) and latter the same value
will be combine into the result stream when the x stream will observed the 4 value.
When does it halt?
the Combine Latest processing will come to end either when one of the stream
will complete or throw exception.
Code sample
the code for this post can be download from here.
Helper method for creating interval stream:
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the method parameters include (line 2):
- prefix: just so we can distinguish values that was created by one stream from
values created on the other (see line 7). - stop at: define how many values will be produce by the stream before complete (see line 6).
- seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",15, 0.5);
- var ys = CreateObservable("Y",2, 2);
-
- var combineLatestStream = xs.CombineLatest(ys, (x,y) => x + " : " + y);
- combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
- Console.ReadKey();
- }
line 3, create stream which will produce 15 values before completion at frequency of 0.5 seconds.
line 4, create stream which will produce 2 values before completion at frequency of 2 seconds.
line 6, is combine both streams, the Lamda is using to concatenate both stream's values.
The result will look as follow:
Summary
Combine Latest expression can be used to synchronize 2 streams,
while the output stream will get notified when ever either of the stream produce new value.
תגים של Technorati:
Rx,
IObservable,
IObserver
Calling WCF secured service from Java
recently I was working on exposing secured WCF service
to Java consumers.
I was responsible for the .NET side and a java expert named Tsvika
responsible for the Java side.
enabling secured conversation between Java and .NET using WCF is not a trivial task,
and it does needed some additional steps.
you should have certificate install, and having the binding and behaviors configured
in a way that the Java proxy can manage.
Certificates
the first step needed is having a valid certificate.
the certificate should be installed into the service's hosting machine.
you can learn more on how to handle the certificates in here.
Service configuration
Binding
the secure binding should look as follow
Code Snippet
- <wsHttpBinding>
- <binding name="UsernameAndPassword">
- <security mode="Message">
- <message clientCredentialType="UserName"
- negotiateServiceCredential="false"
- establishSecurityContext="false"
- algorithmSuite="Basic128"/>
- </security>
- </binding>
- </wsHttpBinding>
line 3, the binding mode should be Message
line 4, the client certificate type should be UserName.
line 5, the negotiation service credentials must switch off.
line 6, the establish security context should also turn off.
Behaviors
use the following behaviors
Code Snippet
- <serviceBehaviors>
- <behavior name="Bnaya.Samples.UsernameAndPassword">
- <serviceMetadata httpGetEnabled="true" policyVersion="Default" />
- <serviceCredentials>
- <serviceCertificate findValue="RPKey" storeLocation="LocalMachine"
- storeName="TrustedPeople" x509FindType="FindBySubjectName" />
- <userNameAuthentication userNamePasswordValidationMode="Custom"
- customUserNamePasswordValidatorType="Bnaya.Samples.MyUserNameValidator, WcfSecureed" />
- </serviceCredentials>
- </behavior>
- </serviceBehaviors>
lines 5-6, define which certificate should use for the service (RPKey is the name of the certificate).
in case of service hosting it is more reasonable to install the certificate on the machine level (instead of the user level) .
lines 7-8, define the authentication handler (as you can see in the following snippet).
Code Snippet
- public sealed class MyUserNameValidator : UserNamePasswordValidator
- {
- public override void Validate(string userName, string password)
- {
- if (userName != "admin" || password !="admin")
- throw new SecurityException("Access denied.");
- }
- }
the validate method is where your authentication code goes.
End point
nothing is special about the definition of the service section
Code Snippet
- <service name="Bnaya.Samples.Service1" behaviorConfiguration="Bnaya.Samples.UsernameAndPassword">
- <host>
- <baseAddresses>
- <add baseAddress="http://localhost:8731/Service1/"/>
- </baseAddresses>
- </host>
- <endpoint address="" binding="wsHttpBinding" contract="Bnaya.Samples.IService1"
- bindingConfiguration="UsernameAndPassword">
- <identity>
- <dns value="localhost"/>
- </identity>
- </endpoint>
- </service>
small tip: replace the localhost with the correct IP so when the Java proxy is generated,
the endpoint will be correct.
Java
java has several libraries that can invoke WCF, unfortunately not all of them
has good implementation for the WCF secured conversation.
the library that do working fine with secured conversation is Sun: Metro Web Services
and more precisely you should use WSIT.
do not waste your time on Axis2 / Rampart it is only half backed in compare to the Sun library.
Summary
this task is not travel, but with the right library and java expert like Yaakov,
it is certainly feasible.
תגים של Technorati:
WCF,
Java
Rx - for beginners (part 7): Zip expression
this post is the 7th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Zip expression.
the code for this post can be download from here.
What does it do?
the zip expression is used to synchronize 2 IObservable streams into single IObservable stream.
How does it do it?
it is taking the first observed value on either of the stream and wait for value from the other stream.
after both stream produce a value, the zip will project the value combination,
then it will wait for the next couple.
What happens when the stream frequency is not equals?
if one of the stream produce values more frequently than the other, those values will be queue in memory
and each time that the slower stream will produce a value, a single value will be fetched out of the fast stream's queue,
and combined with the slower stream's value.
that mean that you should be aware for possibly side effects, in terms of memory usages.
the marble diagrams of zip expression will look as follow:
When does it halt?
the zip processing will come to end either when one of the stream
will complete or when either of the stream will throw exception.
…..
actually it is a bit more complex,
when one of the stream complete, and still having value within the queued,
the complete notification will be put into the queue.
which mean that the actual complete will take place only the queue become dry
or when the other stream will notify completion.
the marble diagram for this scenario will look as follow:
if either of the stream is throwing exception the exception will immediately
project into the result stream and halt the zip operation.
Code sample
the code for this post can be download from here.
Helper method for creating interval stream:
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the method parameters include (line 2):
- prefix: just so we can distinguish values that was created by one stream from
values created on the other (see line 7). - stop at: define how many values will be produce by the stream before complete (see line 6).
- seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",5, 0.1);
- var ys = CreateObservable(" Y",3, 1);
-
- var zippedStream = xs.Zip (ys, (x, y) => x + " : " + y);
- zippedStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Complete"));
- Console.ReadKey();
- }
line 3, create stream which will produce 5 values before completion at frequency of 0.1 seconds.
line 4, create stream which will produce 3 values before completion at frequency of 1 seconds.
line 6, is zipping both streams, the Lamda is using to concatenate both stream's values.
The result will look as follow:
Summary
Zip expression can be used to synchronize 2 streams,
but we should be carful of possible memory side effects.
תגים של Technorati:
Rx,
IObservable,
IObserver
Recommending on a blogger
I just discover a blogger that you may want to pay attention to:
his blog has some short tips on new little features that were added to VS 2010.
among his tips are:
want to read more?
register to his blog at http://blogs.msdn.com/wriju/default.aspx
Rx - for beginners (part 6): Merge expression
this post is the 6th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.

in this post we will focus on the Merge expression.
What does it do?
the merge expression is used to merge multiple IObservable streams into single IObservable streams.
the marble diagrams of merge expression will look as follow:
each value on the source streams is project into the result stream until all
the source streams complete.
if one of the stream raise exception the error will be project into the result stream
and dispose the subscriptions (the result stream will stop listening to any of the streams).
the marble diagram for this case will look as follow:
Code Sample
the code sample can be download from here.
the sample for this post is merging 3 streams that is running on different time interval
into one stream.
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the code is using the CreateObservable method in order to create each of the source stream.
line 5, Observable.Interval is creating stream with increasing value for each interval.
line 6, is stopping the after stopAt iterations.
line 7, formatting the value of the stream to string with prefix and the interval value.
the main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",5, 0.3);
- var ys = CreateObservable(" Y",2, 1);
- var zs = CreateObservable(" Z",7, 0.6);
-
- //var mergedStream = xs.Merge(ys).Merge(zs);
- var mergedStream = Observable.Merge(xs,ys,zs);
- mergedStream.Subscribe(value => Console.WriteLine(value));
- Console.ReadKey();
- }
Lines 3-5, creating source streams.
line 8, creating the merged stream.
line9, subscribe to the merged stream.
the following is the output:
Summary
Merge is exactly what we expected, any value of the source streams will be project
into the result stream unless exception occurs.
תגים של Technorati:
Rx,
IObservable,
IObserver
Rx - for beginners (part 5): marble diagrams, select and where
this post is the 5th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.

this post will focus on marble diagrams which is used for visualizing IObservable stream.
on this post we will visualize the select and where clause, while in the
upcoming posts we will discuss other operations that can be
used to upon IObservable streams.
the code sample for this post is available here.
What should we visualize?
the event stream should visualize the following:
- on next
- on error
- on complete

How should we read the diagram?
marble diagram can have one or more horizontal line.
- each horizontal line present the timeline of single IObservable stream.
- the ellipse present single occurrence of new value event (on next).
- the vertical line present the end of the stream sequence (on complete).
- the X sign present exception (on error).
The hidden assumption used by the Rx framework is that the IObservable stream
should stop either on complete or on error.
How can we draw select clause using the marble diagram?
Code Snippet
- // Create IObservable that has increasing long value each second
- var timeStream = Observable.Interval(TimeSpan.FromSeconds(1));
-
- // invert the timeStream stream using Linq statement
- var negativeTimeStream = from value in timeStream
- select -value;
-
- negativeTimeStream.Subscribe(value => Console.WriteLine(value));
line 2, creating IObservable stream that increment the projected value each second.
line 5, is using Linq statement to convert the stream into negative value.
line 8, subscribe to the stream and write it to the console.
the result is shown in the following image.
the marble diagrams for the above snippet will look like the following diagram:
the marble diagrams show the construction of new IObservable stream using the select operation.
the vertical arrow present the transition from the source stream into the destination stream.
as the case with IEnumerable the select statement is extension method which
can be used on any IObservable stream, we will get same result for the following code:
Code Snippet
- var negativeTimeStream = timeStream.Select(value => -value);
Where clause marble diagram?
the where clause of the following snippet:
Code Snippet
- // taking only even values
- //var evenTimeStream = timeStream.Where(value => value % 2 == 0);
- var evenTimeStream = from value in timeStream
- where value % 2 == 0
- select value;
will be present:
Summary
marble diagrams is how stream manipulation can be visualize.
we already surveyed the select and where operations, and we will continue
with other operation in the upcoming posts.
Code Samples
the code sample for this post is available here.
תגים של Technorati:
Rx,
IObservable,
IObserver
Rx - for beginners (part 4): anonymous observer handler
this post is the 4th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
this post will focus on anonymous observer handler.
the code sample for this post can be download from here.
Anonymous Observer
as we mention on the previous posts, IObserver is used as a callback interface
which can be subscribe to the IObservable,
but this is a bit of overkill, when what we subscribe is relatively small (like Console.Write).
wouldn't it be nice to use Lamda expression instead?
doesn't we want to have something similar to button1.Click += (s,e)=>Console.WriteLine(…); .
IObservable extensions
using Lamda expression syntax in order to subscribe delegate into the IObservable is
available because IObservable got the following extension options for its Subscribe method
(under the hood we are speaking about extension methods).
- Action<TSource> onNext
- Action<TSource> onNext, Action onCompleted
- Action<TSource> onNext, Action<Exception> onError
- Action<TSource> onNext, Action<Exception> onError, Action onCompleted
so assuming we got IObservable instance called foo, the following code is perfectly OK:
- IDisposable unsubscribe = foo.Subscribe(value => Console.WriteLine(value));
for simple subscription only for the on next callback.
subscribing both for on next and on error will look as follow:
- IDisposable unsubscribe = foo.Subscribe(
- value => Console.WriteLine(value),
- exc => Console.WriteLine(exc));
what is actually happens is that under the hood the subscribe method creating
instance of IObserver, attaching the delegate, and hand it to the IObservable subscribe method.
Code Sample
I took the previous post's sample, which look as follow:
Code Snippet
- class Program
- {
- static void Main(string[] args)
- {
- var obs = new Observer();
- var observable = new FakeObservableFeeder();
- IDisposable unsubscribe = observable.Subscribe(obs);
- Console.ReadKey();
- unsubscribe.Dispose();
- }
- private class Observer : IObserver<int>
- {
- void IObserver<int>.OnCompleted()
- {
- Console.WriteLine("Done");
- }
- void IObserver<int>.OnError(Exception exception)
- {
- Console.WriteLine("Error: " + exception.Message);
- }
- void IObserver<int>.OnNext(int value)
- {
- Console.WriteLine(value);
- }
- }
- }
you can see at line 11, that we implementing IObserver with code one line
for each methods (Console.WriteLine ).
and at line 7, we subscribing this observer into the IObservable instance.
Code Sample using anonymous observer
this can be match simpler using anonymous observer, as shown in the following snippet:
Code Snippet
- class Program
- {
- static void Main(string[] args)
- {
- var observable = new FakeObservableFeeder();
- // subscribing handlers (Anonymous observer)
- IDisposable unsubscribe = observable.Subscribe(
- value => Console.WriteLine(value),
- exc => Console.WriteLine(exc),
- () => Console.WriteLine("Done"));
- Console.ReadKey();
- unsubscribe.Dispose();
- }
- }
definitely less code was written.
you can see the subscription at line 7.
- line 8, is handling the on next action.
- line 9, is handling the on error action.
- line 10, is handling the on complete action.
Summary
what we have seen is that simple action can be subscribed using anonymous observer handlers.
תגים של Technorati:
IObservable,
IObserver,
Rx
MEF Preview 9 was released
MEF preview 9 changes will be reflected the in the release of .NET 4.0 and Silverlight 4.0.
except from bug fixing, there was some changes to the API,
most of the changes is related to System.ComponentModel.Composition.Initilization.dll,
which is not yet available only for none Silverlight application :-(
here is a short list of API changes:
-
PackageCatalog were brought back, and changed it name to DeploymentCatalog.
-
PartCreator was renamed to ExportFactory.
-
PartInitializer was renamed to CompositionInitializer
-
CompositionHost.InitializeContainer was renamed to CompositionHost.Initialize
i do hope that the System.ComponentModel.Composition.Initilization.dll will finally find it place
into the CLR 4.0 release.
you can download it from the MEF codeplex.
the information was taken from Wes' Puzzling Blog.
UPDATES:
From: Glenn block
Unfortunately we will not be shipping CompositionInitializer / Host as part of .NET 4.0. .NET 4.0 and SL are on different ship cycles and the desktop locked long before we had the functionality in SL. We will do an OOB release on codeplex for desktop. I currently have an early version on my SkyDrive, but we will get bits updated for codeplex. The plan is to role CI into the desktop in the future.
Rx - for beginners (part 3): IObservable Vs. IEnumerable
this post is the 3rd in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
this post will focus on how exactly the IObservable/IObserver mirror IEnumerable/IEnumerator.
IEnumerator operations
IEnumerable expose the following operations:
Code Snippet
- public interface IEnumerator<T> : IDisposable, IEnumerator
- {
- T Current { get; }
- }
-
- public interface
- {
- object Current { get; }
- bool MoveNext();
- void Reset();
- }
- Move next: indicate whether the operation completed
(no more item available on the item source).
and it may also throw Exception if something have got wrong. - Current: is handing the current item.
- Reset: is restarting the iteration.
IObserver mirrored operations
IObserver is mirroring IEnumerator, instead of asking the IEnumerator,
IObserver will tell you what's is happening.
this is known as "don't call us we will call you" :-)
Code Snippet
- public interface IObserver<T>
- {
- void OnCompleted();
- void OnError(Exception exception);
- void OnNext(T value);
- }
the IObserver is actually a callback interface which will be called by the items feeder,
whenever something occurs.
- On complete: is mirroring move next, by notifying that the feeding source
has no more items. - On error: is also mirroring move next, but this time it mirror the move next throw exception behavior.
- On next: is mirroring the current property, by pushing new value into
the the observer OnNext implementation. - because of the push nature of the IObserver the reset does not mirrored.
IEnumerable Operations
Code Snippet
- public interface IEnumerable<T> : IEnumerable
- {
- IEnumerator<T> GetEnumerator();
- }
IEnumerable has one method that return IEnumerator with can be use
to iterate through the items by pulling the items sequentially.
IObservable mirrored operations
Code Snippet
- public interface IObservable<T>
- {
- IDisposable Subscribe(IObserver<T> observer);
- }
- through IObservable we can subscribe to content feeders by handing
implementation IObserver, then the feeder can use the IObserver interface
for each of the operation we discussed earlier. - the subscribe operation return IDisposable instance that is used
for unsubscribe the observer from the feeder.
this is match safer then using unsubscribe method because it doesn't has side effects,
just think of what's happens when you subscribe anonymous delegate.
in fact the disposable instance is mirroring IEnumerator<T> which is inheriting from IDisposable.
Summary
by mirroring IEnumerable/IEnumerator, IObservable/IObserver can change the data
flow direction from pull to push.
Code Sample
you can find code sample that implement simple scenario of feeder
that feed increasing integer sequence as long as the integer value is below 100.
the solution contain 2 projects, one implement the scenario using enumerable and the
other is using observable.
the code sample can be download from here.
תגים של Technorati:
IObservable,
IObserver,
Rx
Rx - for beginners TOC
Also available
תגים של Technorati:
Iobservable,
Iobserver,
Rx
Rx – Code cartoon
the following cartoon emphasis the Rx – IObservable push nature against the
pull nature of IEnumerable.
Summary
Reactive approach can reduce your phone bill :-)
תגים של Technorati:
IObservable,
IObserver,
Rx
More Posts
Next page »