Tried to comment everything, i suck at that but i did my best.
For this i used the SocketAsyncEventArgs class for accept and I/O operations, Disconnect is done synchronously.
So lets divide all the code so i can explain it better
Buffer Pool
According to msdn, we can create one main big buffer, and assign to the SocketAsyncEventArgs (SAEA from now) instance a part of it, this should avoid or reduce the memory fragmentation caused for allocating bytes[] for each SAEA.
The buffer pool cass (BufferPool.cs) is generic, so i can create a buffer pool of any data type, for this had to create another class (BufferSegment.cs) containing the index, the length and the BufferPool instance that created the segment.
BufferPool.cs:
Code:
/// <summary> /// Wraps an Array to divide it in segments to be used, maintain a single array should /// reduce memory fragmentation /// </summary> public class BufferPool<T> { /// <summary> /// Gets the length of the segment that the main buffer will be divided /// </summary> public readonly int SubBufferLength; /// <summary> /// Used to measure the time since the last resize operation was done /// </summary> Stopwatch _resizeTime; /// <summary> /// The main buffer, will be divided in blocks of SubBufferLength amount of elements /// </summary> public T[] MainBuffer; /// <summary> /// Queue containing all the free segments from the main buffer /// </summary> public ConcurrentQueue<BufferSegment<T>> _freeSegmentsQueue; /// <summary> /// Handles the buffers of the SocketAsyncEventArgs instances used in a server /// dividing a main buffer to avoid memory fragmentation /// </summary> public BufferPool(int initialCapacity, int subBufferSize) { this.SubBufferLength = subBufferSize; this._freeSegmentsQueue = new ConcurrentQueue<BufferSegment<T>>(); this.MainBuffer = new T[initialCapacity * subBufferSize]; for (int i = 0; i < initialCapacity * subBufferSize; i += SubBufferLength) { _freeSegmentsQueue.Enqueue(new BufferSegment<T>(i, SubBufferLength, this)); } this._resizeTime = new Stopwatch(); this._resizeTime.Start(); } /// <summary> /// Increases the length of the main buffer two times, similar to /// what C++ vector class does /// </summary> private void ResizeBuffer() { if (_resizeTime.ElapsedMilliseconds > 5) { _resizeTime.Restart(); Console.WriteLine("resizing! {0}", MainBuffer.Length / SubBufferLength); int originalLength = MainBuffer.Length; Array.Resize<T>(ref MainBuffer, originalLength * 2); for (int i = originalLength; i < MainBuffer.Length; i += SubBufferLength) { _freeSegmentsQueue.Enqueue(new BufferSegment<T>(i, SubBufferLength, this)); } } } /// <summary> /// Gets a BufferSegment instance containing the index and length of a block of /// memory from the main buffer /// </summary> public BufferSegment<T> GetSegment() { BufferSegment<T> freeSegment; if (_freeSegmentsQueue.TryDequeue(out freeSegment)) { return freeSegment; } else { ResizeBuffer(); freeSegment = GetSegment(); } return freeSegment; } /// <summary> /// Frees a BufferSegment to be reused later /// </summary> /// <param name="segment">Segment released</param> public void FreeBuffer(BufferSegment<T> segment) { _freeSegmentsQueue.Enqueue(segment); } }
Code:
/// <summary> /// Belongs to a BufferPool instance, contains an Index and Length /// to represent the buffer segment /// </summary> /// <typeparam name="T"></typeparam> public class BufferSegment<T> { /// <summary> /// BufferPool instance that created this BufferSegment /// </summary> public BufferPool<T> Owner; /// <summary> /// Gets the index of the main buffer where this segment is located /// </summary> public int Index { get; private set; } /// <summary> /// Gets the length of this segment /// </summary> public int Length { get; private set; } /// <summary> /// Belongs to a BufferPool instance, contains an Index and Length /// to represent the buffer segment /// </summary> public BufferSegment(int index, int length, BufferPool<T> owner) { Owner = owner; Index = index; Length = length; } /// <summary> /// Frees a BufferSegment to be reused later /// </summary> public void Free() { Owner.FreeBuffer(this); } }
SocketAsyncEventArgs Pool
According to msdn again, SocketAsyncEventArgs can be reused to reduce the GC pressure, quite easy to implement, the class just wraps a queue of SAEA's
SocketAsyncEventArgsPool.cs:
Code:
/// <summary> /// Encapsulates a collection of SocketAsyncEventArgs that can be reused /// </summary> public class SocketAsyncEventArgsPool { /// <summary> /// Contains all the SocketAsyncEventArgs that aren't being used by any /// AsyncClient instance /// </summary> ConcurrentQueue<SocketAsyncEventArgs> _socketAsyncEventArgsQueue; /// <summary> /// Encapsulates a collection of SocketAsyncEventArgs that can be reused /// </summary> public SocketAsyncEventArgsPool() { _socketAsyncEventArgsQueue = new ConcurrentQueue<SocketAsyncEventArgs>(); } /// <summary> /// Try to gets a non used SocketAsyncEventArgs from the pool /// </summary> /// <param name="e">Reference that will hold the non used /// SocketAsyncEventArgs instance</param> /// <returns>Returns True if the operation was completed succesfully, /// False otherwise</returns> public bool TryPop(out SocketAsyncEventArgs e) { return _socketAsyncEventArgsQueue.TryDequeue(out e); } /// <summary> /// Adds a SocketAsyncEventArgs instance that is not being used /// to the pool /// </summary> /// <param name="e">SocketAsyncEventArgs that will be added to the pool</param> public void Push(SocketAsyncEventArgs e) { _socketAsyncEventArgsQueue.Enqueue(e); } }
Circular Stream
To avoid packet fragmentation, instead of receiving the first 2 bytes (length), decrypt, and receive the 'length' bytes, i decided to create a circular stream, a precreated and big enough byte array and some maths to know where the next read/write operations must be done, after reading the Length of the stream will be decreased, and vice-versa when writing, this class inherits some functions from the Stream class, and wraps a CircularBuffer, a class similar to what i have described but generic (stream works with bytes, that was the only reason to separate them).
So to resume, the socket will always receive the max. amount possible of bytes, write them to the circular stream, and repeat, on the other hand, for processing, it's possible to read the necesary bytes to know the packet length, and check if enough bytes have been received or not.
CircularBuffer.cs
Code:
/// <summary> /// A buffer implemented like a queue with a maximum length that will not allocate /// extra memory to enqueue data, once maximum length is reached, no more data can /// be enqueued. /// </summary> /// <typeparam name="T">Type of the elements that will hold the buffer</typeparam> public class CircularBuffer<T> : IEnumerable<T> { /// <summary> /// Array containing the elements of the buffer /// </summary> private T[] _buffer; /// <summary> /// Gets the maximum length of the buffer /// </summary> public int Length { get { return _buffer.Length; } } /// <summary> /// Gets the index of the buffer where the next "Add" operation will be executed /// </summary> public int Head{get; private set;} /// <summary> /// Gets the index of the buffer where the next "Get" operation will be executed /// </summary> public int Tail{get; private set;} /// <summary> /// Gets the amount of elements in the buffer /// </summary> public int Count { get; private set; } /// <summary> /// A buffer implemented like a queue with a maximum length that will not allocate /// extra memory to enqueue data, once maximum length is reached, no more data can /// be enqueued. /// </summary> /// <param name="capacity">Maximum amount of elements that the buffer will hold</param> public CircularBuffer(int capacity) { _buffer = new T[capacity]; } /// <summary> /// Changes the Write index of the buffer. /// </summary> /// <param name="amount">amount that the Write index will be incremented.</param> private void IncrementHead(int amount) { if (Head + amount >= Length) { Head = Head + amount - Length; } else { Head += amount; } Count += amount; } /// <summary> /// Changes the Read index of the buffer. /// </summary> /// <param name="amount">Amount that the Read index will be incremented.</param> private void IncrementTail(int amount) { if (Tail + amount >= Length) { Tail = Tail + amount - Length; } else { Tail += amount; } Count -= amount; } /// <summary> /// Adds a new element to the buffer and increases the Write index by one, returns /// false if there is not enough space to add the elements. /// </summary> /// <param name="value">Value that will be added to the buffer</param> /// <returns></returns> public bool TryAdd(T value) { bool succesfullyAdded = false; if (Count < Length) { _buffer[Head] = value; succesfullyAdded = true; IncrementHead(1); } return succesfullyAdded; } /// <summary> /// Adds an array of elements to the buffer and increases the Write index /// acording to the length parameter, returns false if there is not enough space /// to add the elements. /// </summary> /// <param name="values">Array containing the values that will be added</param> /// <param name="index">Index where is located the first element that will be copied from the array</param> /// <param name="length">Amount of elements that will be copied from the array</param> /// <returns></returns> public bool TryAdd(T[] values, int index, int length) { bool succesfullyAdded = false; if (this.Count + length <= this.Length && values.Count() >= index + length) { int copyLength = Math.Min(this.Length - Head, length); Array.Copy(values, index, _buffer, Head, copyLength); IncrementHead(copyLength); if (copyLength < length) succesfullyAdded = TryAdd(values, index + copyLength, length - copyLength); else succesfullyAdded = true; } return succesfullyAdded; } /// <summary> /// Gets the first element from the buffer and increases the Read index by one, returns /// false if there are not any element in the buffer /// </summary> /// <param name="value"></param> /// <returns></returns> public bool TryGet(out T value) { bool succesfullyRemoved = false; value = default(T); if (Count > 0) { value = _buffer[Tail]; _buffer[Tail] = default(T); IncrementTail(1); succesfullyRemoved = true; } return succesfullyRemoved; } /// <summary> /// Copies values from the buffer to the readBuffer parameter at the specified index, /// returns false if the length parameter is major to the amount of elements in the buffer /// </summary> /// <param name="readBuffer">Buffer where the elements will be copied</param> /// <param name="index">Index of the readBuffer where the elements will be copied</param> /// <param name="length">amount of elements that will be read from the buffer</param> /// <returns></returns> public bool TryGet(T[] readBuffer, int index, int length) { bool succesfullyRemoved = false; if (this.Count - length >= 0 && readBuffer.Count() >= index + length) { int copyLength = Math.Min(this.Length - Tail, length); Array.Copy(_buffer, Tail, readBuffer, index, copyLength); IncrementTail(copyLength); if (copyLength < length) succesfullyRemoved = TryGet(readBuffer, index + copyLength, length - copyLength); else succesfullyRemoved = true; } return succesfullyRemoved; } /// <summary> /// Delete values from the buffer and increases the Read index /// </summary> /// <param name="amount">Amount of elements that will be skipped</param> public void Skip(int amount) { amount = Math.Min(amount, Count); for (int i = 0; i < amount; i++) { _buffer[Tail] = default(T); IncrementTail(1); } } public IEnumerator<T> GetEnumerator() { int bufferIndex = Tail; for (int i = 0; i < Count; i++, bufferIndex++) { if (bufferIndex >= Length) bufferIndex = 0; yield return _buffer[bufferIndex]; } } IEnumerator IEnumerable.GetEnumerator() { int bufferIndex = Tail; for (int i = 0; i < Count; i++, bufferIndex++) { if (bufferIndex >= Length) bufferIndex = 0; yield return _buffer[bufferIndex]; } } public T this[int index] { get { if (Count >= index) { if (index + Tail >= Length) index = Tail + index - Length; else index += Tail; return _buffer[index]; } else return default(T); } } }
Code:
/// <summary> /// Wraps a CircularBuffer of bytes and adds stream properties to it /// </summary> public sealed class CircularStream : Stream, IDisposable { /// <summary> /// Buffer containing the stream data /// </summary> private CircularBuffer<byte> _baseBuffer; /// <summary> /// Object for synchronization between different threads /// </summary> private object _syncRoot = new object(); /// <summary> /// Gets an object for synchronization between different threads /// </summary> public object SyncRoot { get { return _syncRoot; } } /// <summary> /// Gets the length of the stream /// </summary> public override long Length { get { return _baseBuffer.Count; } } /// <summary> /// Returns true if at least one byte can be read from the stream /// </summary> public override bool CanRead { get { return _baseBuffer.Count > 0; } } /// <summary> /// Returns true if at least one byte can be skiped /// </summary> public override bool CanSeek { get { return CanRead; } } /// <summary> /// Returns true if at least one byte can be written to the stream /// </summary> public override bool CanWrite { get { return _baseBuffer.Count < _baseBuffer.Length; } } /// <summary> /// Wraps a CircularBuffer of bytes and adds stream properties to it /// </summary> public CircularStream(int length) { _baseBuffer = new CircularBuffer<byte>(length); } /// <summary> /// Release all the resources used by the stream /// </summary> public override void Close() { base.Close(); _baseBuffer = null; } /// <summary> /// Writes a byte in the stream /// </summary> /// <param name="value">byte that will be written into the stream</param> public override void WriteByte(byte value) { if (!_baseBuffer.TryAdd(value)) throw new EndOfStreamException("Can not write more values to the Circular stream!"); } /// <summary> /// Writes an array of bytes to the stream /// </summary> /// <param name="buffer">array containing the bytes that will be written</param> /// <param name="offset">Index of the first byte from the array that will be written</param> /// <param name="count">Amount of bytes that will be written</param> public override void Write(byte[] buffer, int offset, int count) { if(!_baseBuffer.TryAdd(buffer, offset, count)) throw new EndOfStreamException("Can not write more values to the Circular stream!"); } /// <summary> /// Reads on byte from the stream, returns -1 if there are no elements in the stream /// </summary> /// <returns></returns> public override int ReadByte() { byte val; if (!_baseBuffer.TryGet(out val)) return -1; return val; } /// <summary> /// Reads a determined amount of bytes and copy them inside the buffer parameter, returns /// the amount of bytes read of -1 if no element could be read /// </summary> /// <param name="buffer">Array where the read bytes will be copied</param> /// <param name="offset">Index of the array where the first element will be copied</param> /// <param name="count">Amount of bytes that will be read</param> /// <returns></returns> public override int Read(byte[] buffer, int offset, int count) { if (!_baseBuffer.TryGet(buffer, offset, count)) return -1; return count; } /// <summary> /// Reads a short from the stream /// </summary> /// <returns></returns> public short ReadInt16() { byte[] bytes = new byte[2]; if (Read(bytes, 0, 2) > 0) return BitConverter.ToInt16(bytes, 0); else throw new EndOfStreamException(); } /// <summary> /// Reads an int from the stream /// </summary> /// <returns></returns> public int ReadInt32() { byte[] bytes = new byte[4]; if (Read(bytes, 0, 4) > 0) return BitConverter.ToInt32(bytes, 0); else throw new EndOfStreamException(); } /// <summary> /// Reads a long from the stream /// </summary> /// <returns></returns> public long ReadInt64() { byte[] bytes = new byte[8]; if (Read(bytes, 0, 8) > 0) return BitConverter.ToInt64(bytes, 0); else throw new EndOfStreamException(); } /// <summary> /// Reads a byte from the stream without removing it /// </summary> /// <returns></returns> public byte PeekByte() { if (Length < sizeof(byte)) throw new EndOfStreamException(); return _baseBuffer[0]; } /// <summary> /// Reads a short from the stream without removing it /// </summary> /// <returns></returns> public ushort PeekUInt16() { if (Length < sizeof(ushort)) throw new EndOfStreamException(); ushort value = 0; for (int i = 0; i < sizeof(ushort); i++) value += (ushort)(_baseBuffer[i] << (8 * i)); return value; } /// <summary> /// Reads a int from the stream without removing it /// </summary> /// <returns></returns> public uint PeekUInt32() { if (Length < sizeof(uint)) throw new EndOfStreamException(); uint value = 0; for (int i = 0; i < sizeof(uint); i++) value += (uint)_baseBuffer[i] << (8 * i); return value; } /// <summary> /// Reads a long from the stream without removing it /// </summary> /// <returns></returns> public ulong PeekUInt64() { if (Length < sizeof(ulong)) throw new EndOfStreamException(); ulong value = 0; for (int i = 0; i < sizeof(ulong); i++) value += (ulong)_baseBuffer[i] << (8 * i); return value; } /// <summary> /// Clears the stream /// </summary> public override void Flush() { _baseBuffer.Skip(_baseBuffer.Count); } /// <summary> /// Release all the resources used by the stream /// </summary> public void Dispose() { Close(); } // Not implemented functions public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { throw new NotImplementedException(); } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { throw new NotImplementedException(); } public override bool CanTimeout { get { throw new NotImplementedException(); } } public override System.Runtime.Remoting.ObjRef CreateObjRef(Type requestedType) { throw new NotImplementedException(); } public override int EndRead(IAsyncResult asyncResult) { throw new NotImplementedException(); } public override void EndWrite(IAsyncResult asyncResult) { throw new NotImplementedException(); } public override object InitializeLifetimeService() { throw new NotImplementedException(); } public override int ReadTimeout { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override int WriteTimeout { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } }
The Common things
Based this a lot on how sources uses the APM for their socket servers, an AsyncServer class that will accept the connections, an AsyncClient class that will do the I/O operations and the Disconnect metod, and the SocketEvents class, that will hold the delegates for notifying when an asynchronous operations has finished.
All them are generic only to avoid the common
MyClient = asyncClient.UserToken as MyClient;
^^
AsyncServer.cs
Code:
/// <summary> /// Wraps a Socket instance to accept incoming connections asynchronously using /// SocketAsyncEventArgs /// </summary> /// <typeparam name="TUserToken"></typeparam> public class AsyncServer<TUserToken> : SocketEvents<TUserToken> { /// <summary> /// Buffer pool where the AsyncClients will get buffer segments for /// their I/O operations /// </summary> public BufferPool<byte> BufferPool; /// <summary> /// Socket instance used to accept incoming connections /// </summary> public Socket Connection { get; private set; } /// <summary> /// Port that the Connection is listening to /// </summary> public int Port { get; private set; } /// <summary> /// Wraps a Socket instance to accept incoming connections asynchronously using /// SocketAsyncEventArgs /// </summary> /// <param name="estimatedConnetions">Aproximation of the amount of concurrent connections /// that will hande the server</param> /// <param name="MaxPacketLength">Maximum amount of bytes that will be received</param> /// <param name="addressFamily">AddressFamily used in the Socket constructor</param> /// <param name="socketType">SocketType used in the Socket constructor</param> /// <param name="protocolType">ProtocolType used in the Socket constructor</param> public AsyncServer(int estimatedConnetions, int MaxPacketLength, AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType) { this.Connection = new Socket(addressFamily, socketType, protocolType); BufferPool = new BufferPool<byte>(estimatedConnetions, MaxPacketLength); } /// <summary> /// Binds the Connection to a local endpoint using a specified port and a local IP Address /// </summary> /// <param name="port">local port that will the connection will be binded to</param> public void Bind(int port) { Connection.Bind(new IPEndPoint(IPAddress.Any, port)); Port = port; } /// <summary> /// Makes the Connection socket start listening /// </summary> /// <param name="backlog">Maximum amount of pending connection</param> public void Listen(int backlog) { Connection.Listen(backlog); SocketAsyncEventArgs e = new SocketAsyncEventArgs(); e.Completed += AcceptCompleted; bool willRaiseEvent = Connection.AcceptAsync(e); if (!willRaiseEvent) ProcessAccept(e); } /// <summary> /// Action invoked when an accept operation has finished asynchronously /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void AcceptCompleted(object sender, SocketAsyncEventArgs e) { ProcessAccept(e); } /// <summary> /// Action raised when an accept operation has finished /// </summary> /// <param name="e"></param> private void ProcessAccept(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { AsyncClient<TUserToken> newClient = new AsyncClient<TUserToken>(e.AcceptSocket, this); base.Call_SocketConnected(newClient); newClient.StartReceiving(); } e.AcceptSocket = null; bool willRaiseEvent = Connection.AcceptAsync(e); if (!willRaiseEvent) ProcessAccept(e); } }
Code:
/// <summary> /// Wraps a System.Net.Sockets.Socket instance to provide an easy way /// to send/receive data asynchronously using SocketAsyncEventArgs /// </summary> public class AsyncClient<TUserToken> : SocketEvents<TUserToken> { /// <summary> /// Pool of SocketAsyncEventArgs to use on the socket Input/Output (data transfer) /// actions /// </summary> private static SocketAsyncEventArgsPool IOEventsPool = new SocketAsyncEventArgsPool(); /// <summary> /// To avoid double Disconnect calls /// </summary> private bool _disconnected = false; /// <summary> /// SocketAsyncEventArgs used to receive data asynchronously, only one instance must /// be used for receiving, must be returned to the IOEventsPool when the Socket /// has been disconnected /// </summary> private SocketAsyncEventArgs _receiveEvent; /// <summary> /// The server that created the this AsyncClient instance, inherits the BufferPool /// class so one main buffer is kept per server /// </summary> private AsyncServer<TUserToken> _ownerServer; /// <summary> /// The segment that the receiveEvent is associated with /// </summary> private BufferSegment<byte> _bufferSegment; /// <summary> /// Stream where the packets received are written /// </summary> public CircularStream DataStream; /// <summary> /// Gets the Socket wrapped by this AsyncClient instance /// </summary> public Socket Connection { get; private set; } /// <summary> /// Gets or sets the object that is used to recognize this AsyncClient instance /// </summary> public TUserToken Owner; /// <summary> /// Wraps a System.Net.Sockets.Socket instance to provide an easy way /// to send/receive data asynchronously using SocketAsyncEventArgs /// </summary> /// <param name="socket">Socket that will be used for sending/receiving data</param> /// <param name="server">AsyncServer where this AsyncClient instance was created</param> public AsyncClient(Socket socket, AsyncServer<TUserToken> server) { server.CopyTo(this); this.Connection = socket; this._ownerServer = server; } /// <summary> /// Signals the Socket to start receiving packets /// </summary> public void StartReceiving() { if (_receiveEvent == null) { if (!IOEventsPool.TryPop(out _receiveEvent)) { _receiveEvent = new SocketAsyncEventArgs(); } _bufferSegment = _ownerServer.BufferPool.GetSegment(); _receiveEvent.Completed += Completed; _receiveEvent.SetBuffer(_ownerServer.BufferPool.MainBuffer, _bufferSegment.Index, _bufferSegment.Length); bool willRaiseEvent = Connection.ReceiveAsync(_receiveEvent); if (!willRaiseEvent) ProcessReceive(_receiveEvent); } } /// <summary> /// Changes the maximum length of the DataReceived stream, if there are /// data in the stream will be copied to a new stream /// </summary> /// <param name="length"></param> public void SetMaxDataStreamLength(int length) { CircularStream newStream = new CircularStream(length); if (DataStream != null) { byte[] oldBuffer = new byte[DataStream.Length]; DataStream.Read(oldBuffer, 0, oldBuffer.Length); newStream.Write(oldBuffer, 0, (int)Math.Min(oldBuffer.Length, newStream.Length)); } DataStream = newStream; } /// <summary> /// Sends a byte array asynchronously using SocketAsyncEventArgs /// </summary> /// <param name="buffer">Data that will be sent</param> public void Send(byte[] buffer) { SocketAsyncEventArgs e; if (!IOEventsPool.TryPop(out e)) { e = new SocketAsyncEventArgs(); } e.Completed += Completed; BufferSegment<byte> segment = _ownerServer.BufferPool.GetSegment(); e.UserToken = segment ; e.SetBuffer(segment.Owner.MainBuffer, segment.Index, buffer.Length); Buffer.BlockCopy(buffer, 0, e.Buffer, segment.Index, buffer.Length); bool willRaiseEvent = Connection.SendAsync(e); if (!willRaiseEvent) ProcessSend(e); } /// <summary> /// Action invoked when a SendAsync operation has finished /// </summary> /// <param name="e"></param> private void ProcessSend(SocketAsyncEventArgs e) { BufferSegment<byte> segment = e.UserToken as BufferSegment<byte>; segment.Free(); e.SetBuffer(null, 0, 0); e.Completed -= Completed; IOEventsPool.Push(e); } /// <summary> /// Action invoked when an Input/Output operation has finished asynchronously /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Completed(object sender, SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; } } /// <summary> /// Action invoked when a ReceiveAsync operation has finished /// </summary> /// <param name="e"></param> private void ProcessReceive(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success && e.BytesTransferred > 0 && Connection.Connected) { try { DataStream.Write(e.Buffer, _bufferSegment.Index, e.BytesTransferred); base.Call_DataReceived(this, e.Buffer, e.BytesTransferred); bool willRaiseEvent = Connection.ReceiveAsync(e); if (!willRaiseEvent) ProcessReceive(e); } catch { Disconnect(); } } else { Disconnect(); } } /// <summary> /// Disconnects the Socket wrapped and raises the Events.SocketDisconnected action /// </summary> private void Disconnect() { if (!_disconnected) { _disconnected = true; try { _receiveEvent.SetBuffer(null, 0, 0); _receiveEvent.Completed -= Completed; IOEventsPool.Push(_receiveEvent); _bufferSegment.Free(); Connection.Close(); Connection.Dispose(); DataStream.Dispose(); } finally { Call_SocketDisconnected(this); } } } }
Code:
public delegate void SocketConnectionEventHandler<TClient>(AsyncClient<TClient> sender); public delegate void SocketTransferEventHandler<TClient>(AsyncClient<TClient> sender, byte[] data, int length); /// <summary> /// Contains the delegates invoked when a socket is accepted, disconnected or it /// has received data /// </summary> /// <typeparam name="TUserToken"></typeparam> public class SocketEvents<TUserToken> { /// <summary> /// Notifies to teh server that a new socket has been accepted. /// </summary> public event SocketConnectionEventHandler<TUserToken> SocketConnected; /// <summary> /// Notifies to the server that a client has been disconnected. /// </summary> public event SocketConnectionEventHandler<TUserToken> SocketDisconnected; /// <summary> /// Notifies to the server that a client has received data, then the data will /// be writed to the DataStream, use this event to implement a cipher. /// </summary> public event SocketTransferEventHandler<TUserToken> DataReceived; /// <summary> /// Copy all the delegates from the current instance to another SocketEvents instance /// </summary> /// <param name="target">SocketEvenst instance where the delegates will be copied</param> public void CopyTo(SocketEvents<TUserToken> target) { target.SocketDisconnected = SocketDisconnected; target.DataReceived = DataReceived; target.SocketConnected = SocketConnected; } protected void Call_SocketConnected(AsyncClient<TUserToken> client) { if (SocketConnected != null) SocketConnected(client); } protected void Call_SocketDisconnected(AsyncClient<TUserToken> client) { if (SocketDisconnected != null) SocketDisconnected(client); } protected void Call_DataReceived(AsyncClient<TUserToken> client, byte[] data, int amount) { if (DataReceived != null) DataReceived(client, data, amount); } }
Using the code
I'm lazy now so here you have an example, it's quite a bad example but it worked :P
Code:
AsyncServer<int> server = new AsyncServer<int>(0, 1024, AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); server.Events.SocketConnected = Connected; server.Events.SocketDisconnected = Disconnected; server.Events.DataReceived = Received; server.Bind(9958); server.Listen(100);
Code:
static void Connected(AsyncClient<int> client) { Console.WriteLine("connected"); client.SetMaxDataStreamLength(1024); // Initializes the DataStream for the client // Here, i assume that NEVER, there will be more than 1024 bytes that must be processed // A big enough number must be used when using this socket server with apps like conquer // Where packet processing migth become slow } static void Received(AsyncClient<int> client) { while (client.DataStream.Length > 0) { if (client.DataStream.Length >= 2) // assuming that the first 2 bytes are the total size { short size = client.DataStream.ReadInt16(); if (client.DataStream.Length >= size - 2) { byte[] bytes = new byte[size - 2]; client.DataStream.Read(bytes, 0, size - 2); Console.WriteLine(Encoding.ASCII.GetString(bytes)); } else break; //Size should be queued so on next notification that size will be checked } else break; } } static void Disconnected(AsyncClient<int> client) { Console.WriteLine("DC!"); }
To finish, i haven't fully tested this yet so some errors may happen, nor i have fully coded it yet, wanted to implement more Read functions to the CircularStream, but meh i'm lazy
Forgot to say, there is an extra parameter in the AsyncServer constructor (footer length), wanted to remove the "TQClient" seal before writing the packet to the stream, but didn't find any reason to do it.