Rx Challenge #4

2015/03/25

no comments

this challenge was taken from real life problem.

It was one of my colleague who ask about reacting to Bluetooth device’s output

the device provides stream of byte[] that represent message header + body
for each byte[].
the first chunk of bytes represent the header (kind, chunk count)
– the first byte represent message kind (Enum of None = 0, MessageA = 1, MessageB = 2)
– the second byte represent number of chunk (the message split to)
on each following message’s chunk
– the first byte is equals to zero (this is how you know that the message is not a starting point)
  when the first byte != 0 than it’s new message

for example:
      {1,3,32,4,34,54} means Starting message of type MessageA
                                 which is built from 3 different buffers
                                 and the bytes {32,4,34,54} are the first actual bytes
      {0,4,65,3}           means continuation for the previous message
                                 which contains following bytes {4,65,3}
      {0,81,3}              means the closing of the message
                                 which contains following bytes {81,3}

 

your challenge is to build messages from split chunks.

use the following test to validate your solution.

you can post your solution to this Rx’s forum thread

[TestClass]
public class DeviceListeningQuestionTest
{
[TestMethod]
public void SwitchPlusTests_Test()
{
// device provides stream of byte[] that represent message header + body
// for each byte[] the first byte represent the header
// the first byte represent Message Type Enum of (None, MessageA, MessageB, C)
// while the real message may be construct from multiple sequence of byte[]
// when the first byte != None the 2nd byte represent the number of byte[] used
// for construction of the full message
// when the first byte == None the next bytes should be concatenate with the
// previous byte[] in order to construct the full message
// for example:
// {1,3,32,4,34,54} means Starting message from type MessageA
// which is built from 3 different buffers
// and the bytes {32,4,34,54} are the first actual bytes
// {0,4,65,3} means continuation for the previous message
// which contains following bytes {4,65,3}
// {0,81,3} means the closing of the message
// which contains following bytes {81,3}

var scheduler = new TestScheduler();
var observer = scheduler.CreateObserver<MessageBase>();
var source = new Subject<byte[]>();

var messages = source // TODO: convert from buffer to message
.Select(m => null as MessageBase);
messages.Subscribe(observer);

MessageA a = new MessageA { Name = "Bnaya", Date = DateTime.Today };
Send(MessageType.A, a, source);
MessageB b = new MessageB { Id = 1, Duration = TimeSpan.FromSeconds(1277278) };
Send(MessageType.B, b, source);

var results = observer.Messages
.Where(m => m.Value.Kind == NotificationKind.OnNext)
.Select(m => m.Value.Value).ToArray();

Assert.AreEqual(2, results.Length);
var resultA = (MessageA)results[0];
Assert.IsTrue(resultA.Date == a.Date && resultA.Name == a.Name);
var resultB = (MessageB)results[1];
Assert.IsTrue(resultB.Duration == b.Duration && resultB.Id == b.Id);
}

private class MessageBuilder
{
public MessageBuilder(byte countdown = 0, MessageType kind = MessageType.None, byte[] buffer = null)
{
Buffer = buffer ?? new byte[0];
Countdown = countdown;
Kind = kind;
}

private byte[] Buffer { get; set; }
private MessageType Kind = MessageType.None;
public byte Countdown {get; private set;}

public static MessageBuilder OnNext(MessageBuilder acc, byte[] buffer)
{
MessageType kind = acc.Kind;
if (buffer[0] == 0)
{
buffer = acc.Buffer
.Concat(buffer.Skip(1))
.ToArray();
}
else
{
kind = (MessageType)buffer[0];
acc.Countdown = buffer[1];
buffer = buffer.Skip(2).ToArray();
}

byte countdown = (byte)(acc.Countdown - 1);
return new MessageBuilder(countdown, kind, buffer);
}

public MessageBase ToMessage()
{
MessageBase result = null;
string tmp = Encoding.UTF8.GetString(Buffer);
if(Kind == MessageType.A)
result = JsonConvert.DeserializeObject<MessageA>(tmp);
if(Kind == MessageType.B)
result = JsonConvert.DeserializeObject<MessageB>(tmp);
return result;
}
}

private static void Send(MessageType kind, MessageBase message, IObserver<byte[]> source)
{
byte pageCount = 2;
string tmp = JsonConvert.SerializeObject(message);
byte[] buffer = Encoding.UTF8.GetBytes(tmp);
byte[][] parts = (from i in Enumerable.Range(0, pageCount)
let len = buffer.Length
let pageSize = len / pageCount + len % pageCount
let page = buffer.Skip(pageSize * i)
.Take(pageSize).ToArray()
select page).ToArray();

var first = new[] {(byte)kind, pageCount};
first = first.Concat(parts[0]).ToArray();
source.OnNext(first);
byte[] chunk = new[] { (byte)0 };
for (int i = 1; i < pageCount; i++)
{
chunk = chunk.Concat(parts[i]).ToArray();
source.OnNext(chunk);
}
}

public enum MessageType : byte
{
None = 0,
A = 1,
B = 2,
}
public abstract class MessageBase
{
}

public class MessageA : MessageBase
{
public string Name { get; set; }
public DateTime Date { get; set; }
}

public class MessageB : MessageBase
{
public int Id { get; set; }
public TimeSpan Duration { get; set; }
}
}

Good luck

Add comment
facebook linkedin twitter email

Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

*