March 2010 - Posts
Rx - for beginners (part 13): Publish (broadcast to many subscribers)
this post is the 13th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Publish operator.
the code for this post available here.
let guess how many times the select statement will be invoke for the following code,
the underline stream will produce 2 value (0, 1)?
Code Snippet
- IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
-
- var producer = from item in observableRoot
- where item < 2
- select item;
-
- producer.Subscribe(value => Console.WriteLine("A: " + value));
- producer.Subscribe(value => Console.WriteLine("B: " + value));
the answer is 4 times, actually the select statement will be invoked for each of the subscription (in this sample we having 2 on lines 7-8),
each time that the producer (observable) will produced value.
in order to emphasis the problem I did some minor changes to the Linq query:
Code Snippet
- IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
-
- var producer = observableRoot.Where(item => item < 2);
- producer = producer.Select(item =>
- {
- Console.WriteLine("X");
- return item;
- });
-
- producer.Subscribe(value => Console.WriteLine("A: " + value));
- producer.Subscribe(value => Console.WriteLine("B: " + value));
as you can see at line 3, we doing the where part,
and at lines 4-8 we doing the select part + writing X to the console each time that the select occurs.
the output for this code will look as follow:
So we do have a problem!
in lot of cases we would like to broadcast the same value for any of the registered subscriber without
recalculating different value for each of the subscription.
How can we do it using the Rx framework?
the solution is to separate the subscriber from the underline producer stream?
the separation is done in 2 steps:
step one, isolating from the underline stream, it is done by using the Publish operator.
the publish operator return non active IConnectableObservable which mean that
subscribers can subscribe to this producer isolation, but the producer isolation
does not yet listening to the underline stream.
step two, connecting to the underline stream by using Connect operator.
the code will look as follow:
Code Snippet
- IObservable<long> observableRoot = Observable.Interval(TimeSpan.FromSeconds(1));
-
- var producer = observableRoot.Where(item => item < 2);
- producer = producer.Select(item =>
- {
- Console.WriteLine("X");
- return item;
- });
-
- IConnectableObservable<long> producerAbstraction = producer.Publish();
-
- producerAbstraction.Subscribe(value => Console.WriteLine("A: " + value));
- producerAbstraction.Subscribe(value => Console.WriteLine("B: " + value));
-
- producerAbstraction.Connect();
nothing changed till line 10.
line 10, creation of IConnectableObservable abstraction (producerAbstraction) (which is not active yet).
lines 12-13, is similar to the previous subscription, but now the subscription is for the abstraction layer.
of producerAbstraction (instead of subscribing to the underline stream).
line 15, connect the producerAbstraction to the underline producer stream (only at this point values will start
flowing into the subscribers callbacks).
the output will be:
as you can see, no duplicate select occurs.
Summary
you should be aware of the subscription behavior, and apply the right pattern that
match your needs.
the code for this post available here.
this post is the 11th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on Rx for JavaScript.
the recently the Rx team release JavaScript library that capable to
get observable stream from events.
for example the syntax for mouse move event will be:
- var mouseMove = Rx.Observable.FromJQueryEvent($(document), "mousemove");
all you have to do in order of using this library is to rx.js which is less than 7Kb (GZipped).
- <script src="rx.js" type="text/javascript"></script>
you can download the installation from here.
the installation include sample code that demonstrate old good JavaScript trick of
attacking sting tail to the mouse move event.
to see the effect move the mouse on the gray panel.
Rx on javascriptRx on javascriptRx on javascript
the full code for this trick is:
Code Snippet
- <html xmlns="http://www.w3.org/1999/xhtml" >
- <head>
- <title>Rx for JavaScript Rocks!</title>
- <script src="http://code.jquery.com/jquery-latest.js"></script>
- <script src="rx.js" type="text/javascript"></script>
- <script type="text/javascript">
- $(document).ready(function()
- {
- var mouseMove = Rx.Observable.FromJQueryEvent($(document), "mousemove");
-
- var text = "time flies like an arrow";
- var container = $("#container");
- for (var i = 0; i < text.length; i++)
- {
- (function(i)
- {
- var s = $(document.createElement("span"));
- s.html(text.charAt(i));
- s.css({ position: "absolute" });
- s.appendTo(container);
-
- mouseMove.Delay(i * 100).Subscribe(function(mouseEvent)
- {
- s.css({ top: mouseEvent.clientY + "px",
- left: mouseEvent.clientX + i * 10 + 15 + "px"});
-
- });
- })(i);
- }
- });
- </script>
- </head>
-
- <body style="font-family: Consolas, monospace; overflow: hidden">
- <div id="container"></div>
- </body>
- </html>
line 9, getting observable stream.
line 13, iterate for each of the character of the text.
lines 22-27, subscribe to the event stream (for each of the character).
Summary
the Rx framework is now available for web developers.
Learn More
Rx - for beginners (part 11): ObserveOn
this post is the 11th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the ObserveOn…
the code sample for this post can be found here.
sometime it is important that the observation will occurs on specific thread.
for example GUI application like (WinForm, WPF and Silverlight) throw exception whenever
UI component accessed on thread different than its creation thread.
ObserveOn instruct the producer (observable) to call the consumer (observer) on specific thread.
having the following Xaml:
Code Snippet
- <Window x:Class="ObserveOnSample.Window1"
- xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
- xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
- Title="Window1" Height="300" Width="300">
- <Grid>
- <TextBlock x:Name="_txt" Text="Not set"/>
- </Grid>
- </Window>
if we want to change the Text property of the text box, we mast do it on the Dispatcher thread.
doing it we have to add the ObserveOnDispather, as shown in the next snippet (line 8):
Code Snippet
- public partial class Window1:Window
- {
- public Window1()
- {
- InitializeComponent();
-
- var timeStream = Observable.Interval(TimeSpan.FromSeconds(1));
- var timeStreamDispather = timeStream.ObserveOnDispatcher();
- timeStreamDispather.Subscribe(value => _txt.Text = value.ToString());
- }
- }
the sample create interval producer (observable) on line 7.
then it ensure that observation will occurs at the dispatcher thread (line 8).
and subscribe to the synchronized producer (observable) on line 9.
Summary
you can use Rx to synchronize your operation to the right thread.
the code sample for this post can be found here.
Chess - Deterministic parallel testing
modern software development is drifting in the direction of Parallel Computing.
even those the tools and libraries for Parallel Computing is continuously improving,
we still left with the old problem of how to test the non deterministic execution nature
of Parallel Computing programming.
this post will focus on the Chess testing framework, which is trying to solve the
non deterministic testing issue.
you can download the chess framework from here.
the code for this post is available here.
Why does load testing isn't enough?
common practice for testing Parallel Computing is to use load test.
this practice is a good one and shouldn't be abandoned,
but it has some profound weaknesses in terms of:
- reproducing the bugs.
- and the ability to guaranty the optimal coverage of thread switching scenarios,
sometimes the rare switching timing will never happens during the load testing,
but they are exactly the one that will happens on the customer production environment.
What does Chess bring to the table?
as analogy I can describe the load testing as statistic weapon,
while the Chess can be describe as smart accurate weapon.
Chess will run the test code in loop, each time it will control the
context switching (through custom scheduler) in a way that it will be fully deterministic
and therefore can be reproduce.
this way Chess will gain a better thread switching coverage which will lead for better
code quality (and it will required shorter execution time, because it doesn't have to run for days to fill up the statistic).
How to define Chess test?
Chess test is define as regular MS Test with one additional attribute
Code Snippet
- [TestMethod()]
- [HostType("Chess")]
- public void ParallelTest()
- {
- MyLogic target = new MyLogic();
- decimal expected = target.Read();
- var trd = new Thread (() => target.Deposit(500));
- trd.Start();
- target.Withdraw(500);
- trd.Join();
- decimal actual = target.Read();
- Assert.AreEqual(expected,actual);
- }
line 2, decorate the test for running under Chess.
we already saw this technique with Moles.
running the test without the Chess hosting will result with success 99.9% of the times.
while adding Chess hosting, the test will consistently fail each time it run.
if you want to see what Chess is doing during the test execution
you can add the following attribute
[TestProperty("ChessDebug","true")] (as shown in line 3)
Code Snippet
- [TestMethod()]
- [HostType("Chess")]
- [TestProperty("ChessDebug","true")]
- public void ChessParallelTest()
- {
- ...
- }
How to reproduce failures?
when ever Chess test execution failed, the test failure page supply useful information
of how to create reproduced failure test (it is recommended to create new test for reproducing
because this test will not goes though all the steps the original Chess test goes, and we
don't want to loose any of those steps).
the reproduced test decoration will look as follow:
Code Snippet
- [TestMethod()]
- [HostType("Chess")]
- [TestProperty("ChessDebug","true")]
- [TestProperty("ChessMode","Repro")]
- [TestProperty("ChessBreak","BeforePreemption,AfterPreemption")]
- #region ChessScheduleString (not human readable)
- [TestProperty("ChessScheduleString",@"bpilaiaaaaaaaaaaaeaaonlnahgabmejjgcfcgcpgnmkhlhpekpfeknhoahekbaiiagabdcenijaeabaommbiimnogjcombngjeh
- cdcjklckibmkgffggffnggbgeammonjnlmphnohloplnphnohloplnphlkdljneochphnpppdpfmgggeabgmpgmoeknkmjjocbia
- kkmibpdphohmbpdpcchomnfpodnhpidfpappbpndjpppphkpcjdpmnoplpifpopoglpmfkpmdpplplkpkdlpodldfpnhplnnmaij
- lfpplfpepplpjopooeopcchnjjppjlohpppohpaannppfghgkfaaaaaa")]
- #endregion
- public void ReproduceChessParallelTest()
- {
- ...
- }
lines 6-11, are non user readable decoration, which is the Chess instruction for
reproducing the failure (setting the right context switching).
line 4, is setting the reproduce mode.
line 5, is not mandatory but it is one of the great Chess feature,
it is actually instructing Chess to place break points at the right spots, so you no longer
have to search in the darkness for the right place and time for your break point.
obviously this save tones of debugging hours.
Summary
as you do write parallel coding, it will be wise to add Chess into your
testing toolbox, for gaining better deterministic of the parallel test result.
Chess is a powerful tool which will lead your products to better quality and shorter
debug efforts.
one last issue is that currently there is no bits compatible with VS 2010 RC so we
will have to be patient till we got it.
Downloads and Learn More
you can download chess from here.
the code for this post is available here.
the Chess web page is available here.
Channel 9 talk is available here.
Chess at the PDC is available here.
Pex – test input generator

this post will focus on the Pex tool.
Pex is an automatically test input generates.
it try to get both high code coverage and potential failures.
you can download Pex from here.
So why do we need another testing tool?
Pex does not intend to replace any of the existing testing frameworks,
in matter of fact, its generated test can be saved as testing code using any of the leading
testing frameworks.
When to use Pex?
i guess it would be smart to use Pex after you wrote your unit tests (using your unit test framework of choice),
but before you actually sign the feature as done.
Which tests would you write for the following code snippet?
Code Snippet
- public decimal Divide(decimal a,decimal b)
- {
- return a / b;
- }
-
- public double Divide(double a,double b)
- {
- return a / b;
- }
this is a fairly small code so i guess you can think of most of the relevant tests,
even with out the Pex help :-)
let see which tests will be chosen by Pex.
How to run Pex
after installing Pex, you can run Pex using the right click context menu.
doing so, Pex will try to understand the code and figure out which test are relevant,
in terms of code coverage and failure potentials.
for the decimal dividing Pex figure out the following tests :
as you can see (under columns a, b columns) the input values that was chosen by Pex is not a random ones.
it chose 0, decimal.MaxValue and decimal.MinValue.
we can see that 2 of the test were actually result with exceptions.
Saving Pex result as tests
one of the great feature of Pex is that it allow us to save the test results as unit test,
(by default it is using VS testing framework but you can change it using its setting,
you can find new tab under the VS tools-> options menu named Pex).
saving is very simple task. all you have to do is select one or more Pex test line, and press the save button
(Pex will generate test project for you if needed).
How to modify Pex generated unit tests?
Pex generated unit tests are build using 2 layers:
- the layer of the unit testing framework (N-Unit, VS Test, MB-Unit, ext…),
which is actually a facade that target the second layer (do not change code on this layer
because Pex code generation may override it). - the second layer called parameterized test and this is where you want to add assertions or
even writing more parameterized tests and ask Pex the generate unit test for those custom tests.
this is how the layers look in the VS solution explorer:
Pex suggestions
final feature for this post is the suggestion window.
Pex will supply suggestion for making your code better.
which is looking as follow:
Summary
Pex is a great tool for making our code better,
we did not cover all of Pex capability so check the learn more section to get more on this topic.
Learn more
recommending VS 2010 XAML IntelliSense Extension
a pal named Karl had published a very useful plug-in to VS 2010.
this plug-in is making Xaml writing so much better experience with enhanced IntelliSense.
if you doing some Xaml work using VS 2010 you should consider to download this plug-in.
the download is available from the following link:
http://karlshifflett.wordpress.com/2010/03/21/visual-studio-2010-xaml-editor-intellisense-presenter-extension/#comment-2228
Features
- Pascal case lookup
- Optional narrowing list filter
- Filtering based on item type
- xmlns IntelliSense options
- Show/hide only solution assemblies
- Show/hide schemas
- Filter settings persist for the current Visual Studio session


Moles – isolate your test unit
this post will focus on the Moles - Isolation framework for .NET.
you can download the Moles framework from here, or even better
download the Pex testing framework (that includes Moles) from here.
(I will survey the Pex framework in one of the upcoming posts).
the code sample for this post is available here.
How do you test the correctness of the following method?
Code Snippet
- public class MyComponent
- {
- private DateTime _lastHit = DateTime.MinValue;
-
- public bool IsActive()
- {
- if(_lastHit.AddMinutes(20) > DateTime.Now)
- return false;
- _lastHit = DateTime.Now;
- return true;
- }
- }
as you can see in line 7, IsActive will be activate only once every 20 minutes.
so should the test wait 20 minutes as shown in the next snippet (line 11):
Code Snippet
- [TestMethod()]
- public void IsActiveFlowOverDurationTest()
- {
- var target = new MyComponent();
- bool actual;
- actual = target.IsActive();
- Assert.AreEqual(true,actual);
- actual = target.IsActive();
- Assert.AreEqual(false, actual);
-
- Thread.Sleep(TimeSpan.FromMinutes(20.1));
-
- actual = target.IsActive();
- Assert.AreEqual(true,actual);
- }
Moles
the Moles framework is giving us the ability to isolate our tests,
by faking Microsoft, 3rd party or even our own dependencies.
I will start with showing the code of our Moles test, and proceed with
some explanation.
Code Snippet
- [TestMethod()]
- [HostType("Moles")]
- public void IsActiveFlowOverDurationTest()
- {
- var target = new MyComponent();
- bool actual;
- actual = target.IsActive();
- Assert.AreEqual(true,actual);
- actual = target.IsActive();
- Assert.AreEqual(false, actual);
-
- var moleTime = DateTime.Now.AddMinutes(20.1);
- MDateTime.NowGet = () => moleTime;
- //Thread.Sleep(TimeSpan.FromMinutes(20.1));
-
- actual = target.IsActive();
- Assert.AreEqual(true,actual);
- }
what's has been changed from our previous test method is the attribute on line 2
(which is a decoration for the Moles framework) .
and lines 12-13 which replacing line 14.
as you can see no waiting is involve.
another important line that we should add to the assembly level is:
- [assembly: MoledType(typeof(System.DateTime))]
which define that we molding the DateTime type.
How to add Moles?
adding Moles is very simple, at the testing project add new item and select "Moles and Stubs for Testing"
it is very important to name the item after the name of the assembly that you want to fake.
Back to the test method
as we saw earlier only 2 line had changed in our test method:
Code Snippet
- var moleTime = DateTime.Now.AddMinutes(20.1);
- MDateTime.NowGet = () => moleTime;
as you can see line 1 has nothing to do with Moles.
line 2, is faking the DateTime.Now.
the faked type name is starting M (this is how Moles work and the type is available
because we added the Moles item into the testing project),
and when ever we are faking property, Get or Set will be add as suffix of the property name.
so that how DateTime.Now become MDateTime.NowGet.
to supply alternative value all you have to do is to assign a delegate
which returns the faked value.
Summary
I didn't got into the subject of test design, but Moles is giving us fairly powerful
framework which is capable of faking CLR or 3rd party types.
you may want to take Moles farther to file system or database access.
as I always say do not wait for your test, let them wait for you :-)
beyond the DateTime sample Moles is a great way for isolating your testing
unit from their dependencies (and of course that there are many other god and
sometime better way to do that isolation, like mocking, Mef, Ioc and more).
Resources
http://channel9.msdn.com/posts/Peli/Moles-Replace-any-NET-method-with-a-delegate/
http://social.msdn.microsoft.com/Forums/en-US/pex/threads/
תגים של Technorati:
Moles,
Test,
UnitTest,
Pex
MEF for Beginner (Import from Xaml) - part 11
this is the 11th post of the MEF for Beginner series, the series TOC is available here.
this post will focus on Importing mef parts directly from the Xaml.
the code sample for this post is available here.
assuming that we have the following exports:
Code Snippet
- class DemoStrings
- {
- [Export("MyTag")]
- public string Text1 { get { return "Hello world"; } }
- [Export("MyTag")]
- public string Text2 { get { return "export using custom attribute"; } }
- [Export("NotMyTag")]
- public string Text3 { get { return "not included (Not My Tag contract)"; } }
- [Export("MyTag")]
- public string Text4 { get { return "Wpf markup extension"; } }
- [Export]
- public string Text5 { get { return "not included (have no contract)"; } }
- }
we may want to have it import into the Xaml as follow:
Code Snippet
- <Window x:Class="Bnaya.Samples.MainWindow"
- xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
- xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
- xmlns:mef="clr-namespace:Bnaya.Samples"
- Title="MainWindow" Height="350" Width="525">
- <Grid>
- <ListBox
- DataContext="{mef:ImportManyStrings MyTag}"
- ItemsSource="{Binding}"/>
- </Grid>
- </Window>
as you can see at line 8 the data context is using special definition for importing all exported strings that
define the their contract as MyTag (looking at the exports it will import the exports at lines 4,6,10).
what should we have in order to achieve that functionality?
actually very little, all we have to do is to define a markup extension that return the exported parts.
the following code is how the markup extension should be define:
Code Snippet
- [MarkupExtensionReturnType(typeof(IEnumerable<string>))]
- public class ImportManyStringsExtension : MarkupExtension
- {
- private readonly IEnumerable<string> _dataSource;
- public ImportManyStringsExtension(string contract)
- {
- _dataSource = CompositionHost.Provider.GetExportedValues<string>(contract);
- }
-
- public override object ProvideValue(IServiceProvider serviceProvider)
- {
- return _dataSource;
- }
- }
line 1, decorate that our markup extension for returning IEnumerable<string>.
line 2, derive from MarkupExtension.
line 7, at the construction time, dynamically ask mef for any discoverable parts that follow the contract.
the CompositionHost is a very small helper class that was taken from the MEF Silverlight implementation
with some very minor enhancements.
lines 10-13, returning the discoverable parts to the Xaml.
One last thing
we should remove the StartupUri from the App.Xaml and using the OnStartup override instead:
Code Snippet
- protected override void OnStartup(StartupEventArgs e)
- {
- base.OnStartup(e);
-
- if (!Utils.IsInDesignTool)
- {
- var asmCatalog = new AssemblyCatalog(typeof(App).Assembly);
- var catalogs = new AggregateCatalog(asmCatalog);
- var container = new CompositionContainer(catalogs);
- CompositionHost.Initialize(container);
- container.Compose(new CompositionBatch());
-
- var root = new MainWindow();
- root.ShowDialog();
- }
- }
line 5, preventing MEF operation during design time.
lines 7-9, setting the MEF container.
line 10, setting the MEF container as the main container of the application
(this way it can be reachable anywhere).
line 11, do the composition,
IMPOERANT: the composition should happens before the window instantiation,
because that when the markup extension will be invoke.
the exported part should be available at this time.
lines 13-14, starting the application.
Summary
we can extend the Xaml very easily in order to get better Xaml->Mef experience.
the code sample for this post is available here.
Recommendation
this is a post recommendation about using Rx framework with WPF events,
I'm recommending this post from bobby's blog, the post is demonstrating the
use of creating observable stream from WPF event.
it is a short post with nice little code sample.
bobby did a real good job by wrapping the observable event (Observable.FromEvent)
into more human readable format (using T4 template).
Rx - for beginners (part 10): Concat expression

this post is the 10th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Concat expression.
the code for this post can be download from here.
What does it do?
the Concat expression is used to concatenate one Observable stream
into the end of another Observable stream.
once the first Observable stream is completed the the concat stream will be immediate subscribed.
the marble diagrams of Concat expression will look as follow:
the result observable stream will subscribe to the yStream after the xStream is completed,
therefore any values on the yStream before the subscription will be ignored.
in case of exception on any of the Concat Observable streams the marble diagrams will look as follow:
Code sample
the code for this post can be download from here.
Helper method for creating interval stream (based on System.Timers.Timer event):
Code Snippet
- static System.Timers.Timer s_timer = new System.Timers.Timer(1000);
-
- private static IObservable<string> CreateObservable(string prefix,TimeSpan stopAt)
- {
- var eventStream = Observable.FromEvent<ElapsedEventArgs>(s_timer,"Elapsed");
-
- var startTime = DateTime.Now;
- var stopTime = startTime + stopAt;
-
- Func<IEvent<ElapsedEventArgs>,bool> stopPredicate =
- value => value.EventArgs.SignalTime < stopTime;
- Func<IEvent<ElapsedEventArgs>,string> selectFormatter =
- value => prefix + (value.EventArgs.SignalTime - startTime).TotalSeconds.ToString("N0");
-
- var obs = eventStream.TakeWhile(stopPredicate).Select(selectFormatter);
-
- return obs;
- }
Line 1, create instance of the timer.
Line 3, the method get 2 parameters:
- prefix: which is used to format the stream output (line 13).
- stop at: which will define the stream lifetime duration.
line 5, create observable stream out of the timer elapsed event (we can use this technique on any .NET event).
lines 10-11, is the stop predicate which is used by the TakeWhile at line 15.
lines 12-13, is the output formatting which is used by the Select at line 15.
the main method is look as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X", TimeSpan.FromSeconds(3.5));
- var ys = CreateObservable(" Y", TimeSpan.FromSeconds(10));
-
- xs.Subscribe (value => {/* do nothing */},
- () =>
- {
- Console.ForegroundColor = ConsoleColor.Gray;
- Console.WriteLine("xs Completed");
- Console.ForegroundColor = ConsoleColor.White;
- });
-
- var combineLatestStream = xs.Concat(ys);
-
- Console.ForegroundColor = ConsoleColor.White;
- combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
- s_timer.Start();
- Console.ReadKey();
- }
line 3, create observable stream with lifetime period of 3.5 seconds.
lime 4, create observable stream with lifetime period of 10 seconds.
lines 6-12, subscribing the the complete event of the xs stream.
line 14, Concat the streams.
line 17, subscribe to to concat output stream.
line 18, start the timer so the streams will start to produce values.
the output will look as follow:
you may notice that Y4 value is missing.
the reason for it is that the complete event occur at the 4th timer elapse event,
and only then the subscription to the ys stream was taking place so nobody was yet listening
to the ys stream when the 4th timer elapsed.
Summary
Concat can join 2 observable stream, but unlike Merge
it doesn't listen on both streams simultaneously.
תגים של Technorati:
Rx,
IObservable,
IObserver
New release of Rx is available for download.
the Rx team has new release for .NET 3.5, .NET 4.0 RC, and Silverlight 3.
you can download the library from here.
the release includes some API changes,
some APIs has moved into different assemblies,
performance and bug fixes.
Highlights
- Moved IObservable & IObserver to seperate assembly: System.Observable.dll.
- Moved Schedulers to System.CoreEx.dll.
- Moved schedulers to System.Concurrency.
- Added schedulers to subjects.
- Removed deadlocks throughout Rx.
- Renamed Hide to AsObservable, add AsObserver, AsEnumerator.
- Added ObserveOn/SubscribeOn on IScheduler.
Release Notes
Build 1.0.2350.0 03/15/2010
- Added System.Observable to redist.txt.
- Changed Timeout to work on multiple notifications.
- Renamed HoldUntilChanged to DistinctUntilChanged.
- Made Semaphore for Silverlight internal.
- Adding Zip overload for IO<T> and IE<T> zipping.
- Changed last Throttle message to come out at Completion time and fix concurrency bug in Throttle.
- Changed OnErrorResumeNext to always complete.
- Changed Delay error semantics to be abort.
- Added ObserveOn/SubscribeOn on IScheduler.
- Fixed finally behavior.
- Minimized locks in subjects.
- Added MinBy, MaxBy operators as well as Min, Max with comparers.
- Added DistinctBy for Enumerable.
Build 1.0.2317.0 03/05/2010
- Moved to RC for .NET 4 build.
- Moved IObservable & IObserver to seperate assembly: System.Observable.dll.
- Removed IsCritical and its usage on exceptions thrown inside Rx operators.
- Fixed bugs in Enumerable->Observable conversions.
- Fixed threading bugs in EnumerableEx Merge and Amb operator.
- Changed signature of Buffer to expose an IList<T> as opposed to an IEnumerable<T>.
- Fixed Buffer behavior, with regards to flushing last frame and non-overlapping windows (count < skip).
- Renamed Buffer to BufferWithTime and BufferWithCount.
- Introduced BufferWithCount for IEnumerable.
- Renamed Hide to AsObservable, add AsObserver, AsEnumerator.
- Moved Schedulers & Disposables to System.CoreEx.
- Added schedulers to subjects.
- Enforced abort semantics for all exceptions in Rx.
- Added OfType and Cast to Observable, plus additional Do methods + parity in EnumerableEx.
- Reworked GroupBy to be lazy & performant.
- Removed deadlocks throughout Rx.
- Fixed reentrancy bug in Scheduler.
- Added RefCountDisposable.
- Removed LockDisposable.
- Added AsyncLock.
- Changed TimeSpan.MaxValue to not be a sentinel value for Infinite.
- Changed IScheduler argument positioning to be last where possible.
- Moved schedulers to System.Concurrency.
- Renamed Scheduler.Now to Scheduler.CurrentThread.
- Renamed Scheduler.Later to Scheduler.ThreadPool.
- Removed Scheduler.Default.
- Added Scheduler.Immediate and ImmediateScheduler.
- Added Scheduler.TaskPool and TaskPoolScheduler where Tasks are available.
- Added Scheduler.NewThread and NewThreadScheduler.
- Added SynchronizationContextScheduler.
- Added IConnectableObservable.
- Added ConnectableObservable.
- Removed EventSubject.
- Changed Publish/Replay/Prune to return IConnectableObservables.
- Added RefCount operator.
- Made all subjects not disposable.
- Added schedulers to asynchronous operations.
- Made Rx assemblies CLSCompliant.
- Added aggregate operators.
- Removed properties from Rx.
- Removed timedrift from time based operators.
- Changed default schedulers for operators.
- Cleaned up Generate and GenerateWithTime overloads.
- Renamed GroupDisposable to CompositeDisposable.
- Moved reference required overloads of extension methods to defining namespace.
- Many other small bug fixes.
- Removed BooleanSignal.
- Fixed timing bug in ControlScheduler.
תגים של Technorati:
IObservable,
IObserver,
Rx
Rx - for beginners (part 9): Hot Vs. Cold observable
this post is the 9th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Hot Vs. Cold observable stream.
the code for this post can be download from here.
If a tree falls in a forest and no one is around to hear it,
does it make a sound?
if it do make a sound when nobody observed it, we should mark it as hot,
otherwise it should be marked as cold.
What is Hot observable?
hot observable streams are streams that is active regardless whether or not
they are being observed.
such stream are the mouse move event or timer event.
What is Cold observable?
cold observable streams are streams that is activated upon subscription
(in most case new stream will be activate for each subscription).
the Observable.Interval is a sample for cold observable stream.
Code Sample
the code for this post can be download from here.
the following sample demonstrate the difference between hot and cold observable streams:
Code Snippet
- var timer = new System.Timers.Timer(1000);
- // create Cold observable stream (this stream will not project values
- // until the subscription, and will project new stream for each subscription)
- var coldStream = Observable.Interval(TimeSpan.FromSeconds(1));
-
- // create Hot observable stream out of .NET event
- var hotStream = Observable.FromEvent<ElapsedEventArgs>(timer,"Elapsed");
- DateTime startTime = DateTime.Now;
- timer.Start(); // start the timer (the timer will start producing values
- // regardless of the subscription)
- // convert the hot stream to stream that project the amount of seconds from the timer start
- var hotStreamInterval = hotStream.Select ( value => (value.EventArgs.SignalTime - startTime).TotalSeconds.ToString("N0"));
-
- Thread.Sleep(TimeSpan.FromSeconds(3)); // sleep for 3 seconds befor the subscription
- using (hotStreamInterval.Subscribe (value => Write("Hot: " + value, ConsoleColor.Green)))
- using(coldStream.Subscribe(value => Write("\t\tCold 1: " + value,ConsoleColor.Yellow)))
- {
- Thread.Sleep(TimeSpan.FromSeconds(2)); // sleep for 2 seconds befor the second subscription
- // second subscription
- using(coldStream.Subscribe(value => Write("\t\t\t\tCold 2: " + value,ConsoleColor.DarkYellow)))
- {
- Thread.Sleep(TimeSpan.FromSeconds(4)); // wait 4 second before disposal
- } // dispose hotStreamInterval and coldStream 2
- } // dispose hotStreamInterval and coldStream 1
line 1, create system timer with 1 second interval.
line 4, define cold observable stream with interval of 1 second.
line 7, creating hot observable stream out of the timer elapsed event (we can use this technique on any .NET event).
line 9, start the timer (because this is hot stream the timer will not wait for the subscription).
line 12, format the timer stream into more friendly observable stream that will project the timer duration in seconds.
line 14, sleeping 3 second before the subscription (this will make the difference between the hot streams that will
increase the value even though no body is listening and the cold stream that doesn't construct until the subscription).
line 15, subscribing to the hot observable stream (the using statement is the subscription scope).
line 16, first subscription to the cold observable stream.
line 18, sleeping before the second subscription to the cold stream.
line 20, second subscription to the cold observable stream (the second subscription will construct new observable stream).
line 22, waiting 4 second before leaving the subscription scope.
the output will look as follow:

- the hot timer observable stream (green) is starting with value = 3 because it is hot stream
which was producing value during the 3 second sleeping period - the first cold observable stream (yellow) is starting with value = 0 because the stream were constructed after
the sleep period. - the second cold observable stream (dark yellow) is starting with value = 0 because each subscription
is constructing new stream.
Summary

subscribing to hot observable stream will produce different result
from cold observable stream subscription, and it is important to
be aware for the nature of the stream you subscribing to.
תגים של Technorati:
Rx,
IObservable,
IObserver
Rx - for beginners (part 8): Combine Latest expression

this post is the 8th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Combine Latest expression.
the code for this post can be download from here.
What does it do?
like the zip expression the Combine Latest expression is used to synchronize
2 IObservable streams into single IObservable stream.
unlike the zip expression the Combine Latest doesn't using queue,
as it name suggest it only remember the latest value of each stream.
What is the combination strategy?
the strategy used by the Combine Latest is to observe both stream, and each time
it new value observed on either of the stream it combine it with the latest value observed on the other stream.
the marble diagrams of Combine Latest expression will look as follow:
we can see that when the y stream observed the b value, the result stream
combine it with the latest value observed on x stream (3) and latter the same value
will be combine into the result stream when the x stream will observed the 4 value.
When does it halt?
the Combine Latest processing will come to end either when one of the stream
will complete or throw exception.
Code sample
the code for this post can be download from here.
Helper method for creating interval stream:
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the method parameters include (line 2):
- prefix: just so we can distinguish values that was created by one stream from
values created on the other (see line 7). - stop at: define how many values will be produce by the stream before complete (see line 6).
- seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",15, 0.5);
- var ys = CreateObservable("Y",2, 2);
-
- var combineLatestStream = xs.CombineLatest(ys, (x,y) => x + " : " + y);
- combineLatestStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Completed"));
- Console.ReadKey();
- }
line 3, create stream which will produce 15 values before completion at frequency of 0.5 seconds.
line 4, create stream which will produce 2 values before completion at frequency of 2 seconds.
line 6, is combine both streams, the Lamda is using to concatenate both stream's values.
The result will look as follow:
Summary
Combine Latest expression can be used to synchronize 2 streams,
while the output stream will get notified when ever either of the stream produce new value.
תגים של Technorati:
Rx,
IObservable,
IObserver
Calling WCF secured service from Java
recently I was working on exposing secured WCF service
to Java consumers.
I was responsible for the .NET side and a java expert named Tsvika
responsible for the Java side.
enabling secured conversation between Java and .NET using WCF is not a trivial task,
and it does needed some additional steps.
you should have certificate install, and having the binding and behaviors configured
in a way that the Java proxy can manage.
Certificates
the first step needed is having a valid certificate.
the certificate should be installed into the service's hosting machine.
you can learn more on how to handle the certificates in here.
Service configuration
Binding
the secure binding should look as follow
Code Snippet
- <wsHttpBinding>
- <binding name="UsernameAndPassword">
- <security mode="Message">
- <message clientCredentialType="UserName"
- negotiateServiceCredential="false"
- establishSecurityContext="false"
- algorithmSuite="Basic128"/>
- </security>
- </binding>
- </wsHttpBinding>
line 3, the binding mode should be Message
line 4, the client certificate type should be UserName.
line 5, the negotiation service credentials must switch off.
line 6, the establish security context should also turn off.
Behaviors
use the following behaviors
Code Snippet
- <serviceBehaviors>
- <behavior name="Bnaya.Samples.UsernameAndPassword">
- <serviceMetadata httpGetEnabled="true" policyVersion="Default" />
- <serviceCredentials>
- <serviceCertificate findValue="RPKey" storeLocation="LocalMachine"
- storeName="TrustedPeople" x509FindType="FindBySubjectName" />
- <userNameAuthentication userNamePasswordValidationMode="Custom"
- customUserNamePasswordValidatorType="Bnaya.Samples.MyUserNameValidator, WcfSecureed" />
- </serviceCredentials>
- </behavior>
- </serviceBehaviors>
lines 5-6, define which certificate should use for the service (RPKey is the name of the certificate).
in case of service hosting it is more reasonable to install the certificate on the machine level (instead of the user level) .
lines 7-8, define the authentication handler (as you can see in the following snippet).
Code Snippet
- public sealed class MyUserNameValidator : UserNamePasswordValidator
- {
- public override void Validate(string userName, string password)
- {
- if (userName != "admin" || password !="admin")
- throw new SecurityException("Access denied.");
- }
- }
the validate method is where your authentication code goes.
End point
nothing is special about the definition of the service section
Code Snippet
- <service name="Bnaya.Samples.Service1" behaviorConfiguration="Bnaya.Samples.UsernameAndPassword">
- <host>
- <baseAddresses>
- <add baseAddress="http://localhost:8731/Service1/"/>
- </baseAddresses>
- </host>
- <endpoint address="" binding="wsHttpBinding" contract="Bnaya.Samples.IService1"
- bindingConfiguration="UsernameAndPassword">
- <identity>
- <dns value="localhost"/>
- </identity>
- </endpoint>
- </service>
small tip: replace the localhost with the correct IP so when the Java proxy is generated,
the endpoint will be correct.
Java
java has several libraries that can invoke WCF, unfortunately not all of them
has good implementation for the WCF secured conversation.
the library that do working fine with secured conversation is Sun: Metro Web Services
and more precisely you should use WSIT.
do not waste your time on Axis2 / Rampart it is only half backed in compare to the Sun library.
Summary
this task is not travel, but with the right library and java expert like Yaakov,
it is certainly feasible.
תגים של Technorati:
WCF,
Java
Rx - for beginners (part 7): Zip expression
this post is the 7th in a series of posts about the new Reactive Framework (Rx).
the series TOC can found here.
in this post we will focus on the Zip expression.
the code for this post can be download from here.
What does it do?
the zip expression is used to synchronize 2 IObservable streams into single IObservable stream.
How does it do it?
it is taking the first observed value on either of the stream and wait for value from the other stream.
after both stream produce a value, the zip will project the value combination,
then it will wait for the next couple.
What happens when the stream frequency is not equals?
if one of the stream produce values more frequently than the other, those values will be queue in memory
and each time that the slower stream will produce a value, a single value will be fetched out of the fast stream's queue,
and combined with the slower stream's value.
that mean that you should be aware for possibly side effects, in terms of memory usages.
the marble diagrams of zip expression will look as follow:
When does it halt?
the zip processing will come to end either when one of the stream
will complete or when either of the stream will throw exception.
…..
actually it is a bit more complex,
when one of the stream complete, and still having value within the queued,
the complete notification will be put into the queue.
which mean that the actual complete will take place only the queue become dry
or when the other stream will notify completion.
the marble diagram for this scenario will look as follow:
if either of the stream is throwing exception the exception will immediately
project into the result stream and halt the zip operation.
Code sample
the code for this post can be download from here.
Helper method for creating interval stream:
Code Snippet
- private static IObservable<string> CreateObservable(
- string prefix,int stopAt, double secondsInterval)
- {
- var inteval = TimeSpan.FromSeconds(secondsInterval);
- var eventStream = Observable.Interval(inteval).
- TakeWhile(value => value < stopAt).
- Select(value => prefix + value.ToString());
-
- return eventStream;
- }
the method parameters include (line 2):
- prefix: just so we can distinguish values that was created by one stream from
values created on the other (see line 7). - stop at: define how many values will be produce by the stream before complete (see line 6).
- seconds interval: define the frequency that the stream produce values (see lines 4,5)
The main method is looking as follow:
Code Snippet
- static void Main(string[] args)
- {
- var xs = CreateObservable("X",5, 0.1);
- var ys = CreateObservable(" Y",3, 1);
-
- var zippedStream = xs.Zip (ys, (x, y) => x + " : " + y);
- zippedStream.Subscribe(value => Console.WriteLine(value),() => Console.WriteLine("Complete"));
- Console.ReadKey();
- }
line 3, create stream which will produce 5 values before completion at frequency of 0.1 seconds.
line 4, create stream which will produce 3 values before completion at frequency of 1 seconds.
line 6, is zipping both streams, the Lamda is using to concatenate both stream's values.
The result will look as follow:
Summary
Zip expression can be used to synchronize 2 streams,
but we should be carful of possible memory side effects.
תגים של Technorati:
Rx,
IObservable,
IObserver
More Posts
Next page »