Register for your free account! | Forgot your password?

Go Back   elitepvpers > MMORPGs > Conquer Online 2 > CO2 Programming
You last visited: Today at 03:39

  • Please register to post and access all features, it's quick, easy and FREE!

Advertisement



Async Socket Server (SocketAsyncEventargs, BufferPool, SAEAPool, Etc...)

Discussion on Async Socket Server (SocketAsyncEventargs, BufferPool, SAEAPool, Etc...) within the CO2 Programming forum part of the Conquer Online 2 category.

Reply
 
Old   #1
 
U2_Caparzo's Avatar
 
elite*gold: 0
Join Date: Aug 2011
Posts: 314
Received Thanks: 90
Async Socket Server (SocketAsyncEventargs, BufferPool, SAEAPool, Etc...)

Hi everyone, today i worked a bit with sockets just because wanted to code an asynchronous socket server in the way i think they should work, yet the design was the first idea i had so don't mind if its ugly for your eyes
Tried to comment everything, i suck at that but i did my best.

For this i used the SocketAsyncEventArgs class for accept and I/O operations, Disconnect is done synchronously.

So lets divide all the code so i can explain it better

Buffer Pool


According to msdn, we can create one main big buffer, and assign to the SocketAsyncEventArgs (SAEA from now) instance a part of it, this should avoid or reduce the memory fragmentation caused for allocating bytes[] for each SAEA.

The buffer pool cass (BufferPool.cs) is generic, so i can create a buffer pool of any data type, for this had to create another class (BufferSegment.cs) containing the index, the length and the BufferPool instance that created the segment.
BufferPool.cs:
Code:
    /// <summary>
    /// Wraps an Array to divide it in segments to be used, maintain a single array should
    /// reduce memory fragmentation
    /// </summary>
	public class BufferPool<T>
	{
        /// <summary>
        /// Gets the length of the segment that the main buffer will be divided
        /// </summary>
        public readonly int SubBufferLength;

        /// <summary>
        /// Used to measure the time since the last resize operation was done
        /// </summary>
        Stopwatch _resizeTime;

        /// <summary>
        /// The main buffer, will be divided in blocks of SubBufferLength amount of elements
        /// </summary>
        public T[] MainBuffer;

        /// <summary>
        /// Queue containing all the free segments from the main buffer
        /// </summary>
        public ConcurrentQueue<BufferSegment<T>> _freeSegmentsQueue;

        /// <summary>
        /// Handles the buffers of the SocketAsyncEventArgs instances used in a server
        /// dividing a main buffer to avoid memory fragmentation
        /// </summary>
		public BufferPool(int initialCapacity, int subBufferSize)
		{
            this.SubBufferLength = subBufferSize;
            this._freeSegmentsQueue = new ConcurrentQueue<BufferSegment<T>>();
            this.MainBuffer = new T[initialCapacity * subBufferSize];
            for (int i = 0; i < initialCapacity * subBufferSize; i += SubBufferLength)
            {
                _freeSegmentsQueue.Enqueue(new BufferSegment<T>(i, SubBufferLength, this));
            }
            this._resizeTime = new Stopwatch();
            this._resizeTime.Start();
		}
        /// <summary>
        /// Increases the length of the main buffer two times, similar to
        /// what C++ vector class does
        /// </summary>
        private void ResizeBuffer()
        {
            if (_resizeTime.ElapsedMilliseconds > 5)
            {
                _resizeTime.Restart();
                Console.WriteLine("resizing! {0}", MainBuffer.Length / SubBufferLength);
                int originalLength = MainBuffer.Length;
                Array.Resize<T>(ref MainBuffer, originalLength * 2);
                for (int i = originalLength; i < MainBuffer.Length; i += SubBufferLength)
                {
                    _freeSegmentsQueue.Enqueue(new BufferSegment<T>(i, SubBufferLength, this));
                }
            }
        }

        /// <summary>
        /// Gets a BufferSegment instance containing the index and length of a block of
        /// memory from the main buffer
        /// </summary>
        public BufferSegment<T> GetSegment()
        {
            BufferSegment<T> freeSegment;
            if (_freeSegmentsQueue.TryDequeue(out freeSegment))
            {
                return freeSegment;
            }
            else
            {
                ResizeBuffer();
                freeSegment = GetSegment();
            }
            return freeSegment;
        }

        /// <summary>
        /// Frees a BufferSegment to be reused later
        /// </summary>
        /// <param name="segment">Segment released</param>
        public void FreeBuffer(BufferSegment<T> segment)
        {
            _freeSegmentsQueue.Enqueue(segment);
        }
}
BufferSegment.cs:

Code:
    /// <summary>
    /// Belongs to a BufferPool instance, contains an Index and Length 
    /// to represent the buffer segment
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BufferSegment<T>
    {
        /// <summary>
        /// BufferPool instance that created this BufferSegment
        /// </summary>
        public BufferPool<T> Owner;
 
        /// <summary>
        /// Gets the index of the main buffer where this segment is located
        /// </summary>
        public int Index { get; private set; }
 
        /// <summary>
        /// Gets the length of this segment
        /// </summary>
        public int Length { get; private set; }
 
        /// <summary>
        /// Belongs to a BufferPool instance, contains an Index and Length 
        /// to represent the buffer segment
        /// </summary>
        public BufferSegment(int index, int length, BufferPool<T> owner)
        {
            Owner = owner;
            Index = index;
            Length = length;
        }
 
        /// <summary>
        /// Frees a BufferSegment to be reused later
        /// </summary>
        public void Free()
        {
            Owner.FreeBuffer(this);
        }
    }

SocketAsyncEventArgs Pool


According to msdn again, SocketAsyncEventArgs can be reused to reduce the GC pressure, quite easy to implement, the class just wraps a queue of SAEA's

SocketAsyncEventArgsPool.cs:
Code:
    /// <summary>
    /// Encapsulates a collection of SocketAsyncEventArgs that can be reused
    /// </summary>
public class SocketAsyncEventArgsPool
{
        /// <summary>
        /// Contains all the SocketAsyncEventArgs that aren't being used by any
        /// AsyncClient instance
        /// </summary>
        ConcurrentQueue<SocketAsyncEventArgs> _socketAsyncEventArgsQueue;
 
        /// <summary>
        /// Encapsulates a collection of SocketAsyncEventArgs that can be reused
        /// </summary>
        public SocketAsyncEventArgsPool()
{
            _socketAsyncEventArgsQueue = new ConcurrentQueue<SocketAsyncEventArgs>();
}
 
        /// <summary>
        /// Try to gets a non used SocketAsyncEventArgs from the pool
        /// </summary>
        /// <param name="e">Reference that will hold the non used
        /// SocketAsyncEventArgs instance</param>
        /// <returns>Returns True if the operation was completed succesfully,
        /// False otherwise</returns>
        public bool TryPop(out SocketAsyncEventArgs e)
        {
            return _socketAsyncEventArgsQueue.TryDequeue(out e);
        }
 
        /// <summary>
        /// Adds a SocketAsyncEventArgs instance that is not being used 
        /// to the pool
        /// </summary>
        /// <param name="e">SocketAsyncEventArgs that will be added to the pool</param>
        public void Push(SocketAsyncEventArgs e)
        {
            _socketAsyncEventArgsQueue.Enqueue(e);
        }
}

Circular Stream


To avoid packet fragmentation, instead of receiving the first 2 bytes (length), decrypt, and receive the 'length' bytes, i decided to create a circular stream, a precreated and big enough byte array and some maths to know where the next read/write operations must be done, after reading the Length of the stream will be decreased, and vice-versa when writing, this class inherits some functions from the Stream class, and wraps a CircularBuffer, a class similar to what i have described but generic (stream works with bytes, that was the only reason to separate them).

So to resume, the socket will always receive the max. amount possible of bytes, write them to the circular stream, and repeat, on the other hand, for processing, it's possible to read the necesary bytes to know the packet length, and check if enough bytes have been received or not.

CircularBuffer.cs
Code:
    /// <summary>
    /// A buffer implemented like a queue with a maximum length that will not allocate
    /// extra memory to enqueue data, once maximum length is reached, no more data can
    /// be enqueued.
    /// </summary>
    /// <typeparam name="T">Type of the elements that will hold the buffer</typeparam>
    public class CircularBuffer<T> : IEnumerable<T>
    {
        /// <summary>
        /// Array containing the elements of the buffer
        /// </summary>
        private T[] _buffer;

        /// <summary>
        /// Gets the maximum length of the buffer
        /// </summary>
        public int Length { get { return _buffer.Length; } }
        
        /// <summary>
        /// Gets the index of the buffer where the next "Add" operation will be executed
        /// </summary>
        public int Head{get; private set;}

        /// <summary>
        /// Gets the index of the buffer where the next "Get" operation will be executed
        /// </summary>
        public int Tail{get; private set;}

        /// <summary>
        /// Gets the amount of elements in the buffer
        /// </summary>
        public int Count { get; private set; }

        /// <summary>
        /// A buffer implemented like a queue with a maximum length that will not allocate
        /// extra memory to enqueue data, once maximum length is reached, no more data can
        /// be enqueued.
        /// </summary>
        /// <param name="capacity">Maximum amount of elements that the buffer will hold</param>
        public CircularBuffer(int capacity)
        {
            _buffer = new T[capacity];
        }

        /// <summary>
        /// Changes the Write index of the buffer.
        /// </summary>
        /// <param name="amount">amount that the Write index will be incremented.</param>
        private void IncrementHead(int amount)
        {
            if (Head + amount >= Length)
            {
                Head = Head + amount - Length;
            }
            else
            {
                Head += amount;
            }
            Count += amount;
        }

        /// <summary>
        /// Changes the Read index of the buffer.
        /// </summary>
        /// <param name="amount">Amount that the Read index will be incremented.</param>
        private void IncrementTail(int amount)
        {
            if (Tail + amount >= Length)
            {
                Tail = Tail + amount - Length;
            }
            else
            {
                Tail += amount;
            }
            Count -= amount;
        }

        /// <summary>
        /// Adds a new element to the buffer and increases the Write index by one,  returns
        /// false if there is not enough space to add the elements.
        /// </summary>
        /// <param name="value">Value that will be added to the buffer</param>
        /// <returns></returns>
        public bool TryAdd(T value)
        {
            bool succesfullyAdded = false;
            if (Count < Length)
            {
                _buffer[Head] = value;
                succesfullyAdded = true;
                IncrementHead(1);
            }
            return succesfullyAdded;
        }

        /// <summary>
        /// Adds an array of elements to the buffer and increases the Write index 
        /// acording to the length parameter, returns false if there is not enough space
        /// to add the elements.
        /// </summary>
        /// <param name="values">Array containing the values that will be added</param>
        /// <param name="index">Index where is located the first element that will be copied from the array</param>
        /// <param name="length">Amount of elements that will be copied from the array</param>
        /// <returns></returns>
        public bool TryAdd(T[] values, int index, int length)
        {
            bool succesfullyAdded = false;
            if (this.Count + length <= this.Length && values.Count() >= index + length)
            {
                int copyLength = Math.Min(this.Length - Head, length);
                Array.Copy(values, index, _buffer, Head, copyLength);
                IncrementHead(copyLength);
                if (copyLength < length)
                    succesfullyAdded = TryAdd(values, index + copyLength, length - copyLength);
                else
                    succesfullyAdded = true;
            }
            return succesfullyAdded;
        }

        /// <summary>
        /// Gets the first element from the buffer and increases the Read index by one, returns
        /// false if there are not any element in the buffer
        /// </summary>
        /// <param name="value"></param>
        /// <returns></returns>
        public bool TryGet(out T value)
        {
            bool succesfullyRemoved = false;
            value = default(T);
            if (Count > 0)
            {
                value = _buffer[Tail];
                _buffer[Tail] = default(T);
                IncrementTail(1);
                succesfullyRemoved = true;
            }
            return succesfullyRemoved;
        }

        /// <summary>
        /// Copies values from the buffer to the readBuffer parameter at the specified index, 
        /// returns false if the length parameter is major to the amount of elements in the buffer
        /// </summary>
        /// <param name="readBuffer">Buffer where the elements will be copied</param>
        /// <param name="index">Index of the readBuffer where the elements will be copied</param>
        /// <param name="length">amount of elements that will be read from the buffer</param>
        /// <returns></returns>
        public bool TryGet(T[] readBuffer, int index, int length)
        {
            bool succesfullyRemoved = false;
            if (this.Count - length >= 0 && readBuffer.Count() >= index + length)
            {
                int copyLength = Math.Min(this.Length - Tail, length);
                Array.Copy(_buffer, Tail, readBuffer, index, copyLength);
                IncrementTail(copyLength);
                if (copyLength < length)
                    succesfullyRemoved = TryGet(readBuffer, index + copyLength, length - copyLength);
                else
                    succesfullyRemoved = true;
            }
            return succesfullyRemoved;
        }

        /// <summary>
        /// Delete values from the buffer and increases the Read index
        /// </summary>
        /// <param name="amount">Amount of elements that will be skipped</param>
        public void Skip(int amount)
        {
            amount = Math.Min(amount, Count);
            for (int i = 0; i < amount; i++)
            {
                _buffer[Tail] = default(T);
                IncrementTail(1);
            }
        }

        public IEnumerator<T> GetEnumerator()
        {
            int bufferIndex = Tail;
            for (int i = 0; i < Count; i++, bufferIndex++)
            {
                if (bufferIndex >= Length)
                    bufferIndex = 0;
                yield return _buffer[bufferIndex];
            }
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            int bufferIndex = Tail;
            for (int i = 0; i < Count; i++, bufferIndex++)
            {
                if (bufferIndex >= Length)
                    bufferIndex = 0;
                yield return _buffer[bufferIndex];
            }
        }

        public T this[int index]
        {
            get
            {
                if (Count >= index)
                {
                    if (index + Tail >= Length)
                        index = Tail + index - Length;
                    else
                        index += Tail;
                    return _buffer[index];
                }
                else return default(T);
            }
        }
    }
CircularStream.cs
Code:
    /// <summary>
    /// Wraps a CircularBuffer of bytes and adds stream properties to it
    /// </summary>
    public sealed class CircularStream : Stream, IDisposable
    {
        /// <summary>
        /// Buffer containing the stream data
        /// </summary>
        private CircularBuffer<byte> _baseBuffer;

        /// <summary>
        /// Object for synchronization between different threads
        /// </summary>
        private object _syncRoot = new object();

        /// <summary>
        /// Gets an object for synchronization between different threads
        /// </summary>
        public object SyncRoot { get { return _syncRoot; } }

        /// <summary>
        /// Gets the length of the stream
        /// </summary>
        public override long Length
        {
            get { return _baseBuffer.Count; }
        }

        /// <summary>
        /// Returns true if at least one byte can be read from the stream
        /// </summary>
        public override bool CanRead
        {
            get { return _baseBuffer.Count > 0; }
        }

        /// <summary>
        /// Returns true if at least one byte can be skiped
        /// </summary>
        public override bool CanSeek
        {
            get { return CanRead; }
        }

        /// <summary>
        /// Returns true if at least one byte can be written to the stream
        /// </summary>
        public override bool CanWrite
        {
            get { return _baseBuffer.Count < _baseBuffer.Length; }
        }

        /// <summary>
        /// Wraps a CircularBuffer of bytes and adds stream properties to it
        /// </summary>
        public CircularStream(int length)
        {
            _baseBuffer = new CircularBuffer<byte>(length);
        }

        /// <summary>
        /// Release all the resources used by the stream
        /// </summary>
        public override void Close()
        {
            base.Close();
            _baseBuffer = null;
        }

        /// <summary>
        /// Writes a byte in the stream
        /// </summary>
        /// <param name="value">byte that will be written into the stream</param>
        public override void WriteByte(byte value)
        {
            if (!_baseBuffer.TryAdd(value))
                throw new EndOfStreamException("Can not write more values to the Circular stream!");
        }

        /// <summary>
        /// Writes an array of bytes to the stream
        /// </summary>
        /// <param name="buffer">array containing the bytes that will be written</param>
        /// <param name="offset">Index of the first byte from the array that will be written</param>
        /// <param name="count">Amount of bytes that will be written</param>
        public override void Write(byte[] buffer, int offset, int count)
        {
           if(!_baseBuffer.TryAdd(buffer, offset, count))
               throw new EndOfStreamException("Can not write more values to the Circular stream!");
        }

        /// <summary>
        /// Reads on byte from the stream, returns -1 if there are no elements in the stream
        /// </summary>
        /// <returns></returns>
        public override int ReadByte()
        {
            byte val;
            if (!_baseBuffer.TryGet(out val))
                return -1;
            return val;
        }

        /// <summary>
        /// Reads a determined amount of bytes and copy them inside the buffer parameter, returns
        /// the amount of bytes read of -1 if no element could be read
        /// </summary>
        /// <param name="buffer">Array where the read bytes will be copied</param>
        /// <param name="offset">Index of the array where the first element will be copied</param>
        /// <param name="count">Amount of bytes that will be read</param>
        /// <returns></returns>
        public override int Read(byte[] buffer, int offset, int count)
        {
            if (!_baseBuffer.TryGet(buffer, offset, count))
                return -1;
            return count;
        }

        /// <summary>
        /// Reads a short from the stream
        /// </summary>
        /// <returns></returns>
        public short ReadInt16()
        {
            byte[] bytes = new byte[2];
            if (Read(bytes, 0, 2) > 0)
                return BitConverter.ToInt16(bytes, 0);
            else
                throw new EndOfStreamException();
        }

        /// <summary>
        /// Reads an int from the stream
        /// </summary>
        /// <returns></returns>
        public int ReadInt32()
        {
            byte[] bytes = new byte[4];
            if (Read(bytes, 0, 4) > 0)
                return BitConverter.ToInt32(bytes, 0);
            else
                throw new EndOfStreamException();
        }

        /// <summary>
        /// Reads a long from the stream
        /// </summary>
        /// <returns></returns>
        public long ReadInt64()
        {
            byte[] bytes = new byte[8];
            if (Read(bytes, 0, 8) > 0)
                return BitConverter.ToInt64(bytes, 0);
            else
                throw new EndOfStreamException();
        }

        /// <summary>
        /// Reads a byte from the stream without removing it
        /// </summary>
        /// <returns></returns>
        public byte PeekByte()
        {
            if (Length < sizeof(byte))
                throw new EndOfStreamException();
            return _baseBuffer[0];
        }

        /// <summary>
        /// Reads a short from the stream without removing it
        /// </summary>
        /// <returns></returns>
        public ushort PeekUInt16()
        {
            if (Length < sizeof(ushort))
                throw new EndOfStreamException();
            ushort value = 0;
            for (int i = 0; i < sizeof(ushort); i++)
                value += (ushort)(_baseBuffer[i] << (8 * i));
            return value;
        }

        /// <summary>
        /// Reads a int from the stream without removing it
        /// </summary>
        /// <returns></returns>
        public uint PeekUInt32()
        {
            if (Length < sizeof(uint))
                throw new EndOfStreamException();
            uint value = 0;
            for (int i = 0; i < sizeof(uint); i++)
                value += (uint)_baseBuffer[i] << (8 * i);
            return value;
        }

        /// <summary>
        /// Reads a long from the stream without removing it
        /// </summary>
        /// <returns></returns>
        public ulong PeekUInt64()
        {
            if (Length < sizeof(ulong))
                throw new EndOfStreamException();
            ulong value = 0;
            for (int i = 0; i < sizeof(ulong); i++)
                value += (ulong)_baseBuffer[i] << (8 * i);
            return value;
        }

        /// <summary>
        /// Clears the stream
        /// </summary>
        public override void Flush()
        {
            _baseBuffer.Skip(_baseBuffer.Count);
        }

        /// <summary>
        /// Release all the resources used by the stream
        /// </summary>
        public void Dispose()
        {
            Close();
        }

        // Not implemented functions

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
        {
            throw new NotImplementedException();
        }

        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
        {
            throw new NotImplementedException();
        }

        public override bool CanTimeout
        {
            get
            {
                throw new NotImplementedException();
            }
        }

        public override System.Runtime.Remoting.ObjRef CreateObjRef(Type requestedType)
        {
            throw new NotImplementedException();
        }

        public override int EndRead(IAsyncResult asyncResult)
        {
            throw new NotImplementedException();
        }

        public override void EndWrite(IAsyncResult asyncResult)
        {
            throw new NotImplementedException();
        }

        public override object InitializeLifetimeService()
        {
            throw new NotImplementedException();
        }

        public override int ReadTimeout
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }

        public override int WriteTimeout
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }

        public override long Position
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }
    }
The Common things


Based this a lot on how sources uses the APM for their socket servers, an AsyncServer class that will accept the connections, an AsyncClient class that will do the I/O operations and the Disconnect metod, and the SocketEvents class, that will hold the delegates for notifying when an asynchronous operations has finished.
All them are generic only to avoid the common
MyClient = asyncClient.UserToken as MyClient;
^^

AsyncServer.cs
Code:
    /// <summary>
    /// Wraps a Socket instance to accept incoming connections asynchronously using 
    /// SocketAsyncEventArgs
    /// </summary>
    /// <typeparam name="TUserToken"></typeparam>
    public class AsyncServer<TUserToken> : SocketEvents<TUserToken>
    {   
        /// <summary>
        /// Buffer pool where the AsyncClients will get buffer segments for 
        /// their I/O operations
        /// </summary>
        public BufferPool<byte> BufferPool;

        /// <summary>
        /// Socket instance used to accept incoming connections
        /// </summary>
        public Socket Connection { get; private set; }

        /// <summary>
        /// Port that the Connection is listening to
        /// </summary>
        public int Port { get; private set; }

        /// <summary>
        /// Wraps a Socket instance to accept incoming connections asynchronously using 
        /// SocketAsyncEventArgs
        /// </summary>
        /// <param name="estimatedConnetions">Aproximation of the amount of concurrent connections
        /// that will hande the server</param>
        /// <param name="MaxPacketLength">Maximum amount of bytes that will be received</param>
        /// <param name="addressFamily">AddressFamily used in the Socket constructor</param>
        /// <param name="socketType">SocketType used in the Socket constructor</param>
        /// <param name="protocolType">ProtocolType used in the Socket constructor</param>
        public AsyncServer(int estimatedConnetions, int MaxPacketLength, AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
        {
            this.Connection = new Socket(addressFamily, socketType, protocolType);
            BufferPool = new BufferPool<byte>(estimatedConnetions, MaxPacketLength);
        }

        /// <summary>
        /// Binds the Connection to a local endpoint using a specified port and a local IP Address
        /// </summary>
        /// <param name="port">local port that will the connection will be binded to</param>
        public void Bind(int port)
        {
            Connection.Bind(new IPEndPoint(IPAddress.Any, port));
            Port = port;
        }

        /// <summary>
        /// Makes the Connection socket start listening
        /// </summary>
        /// <param name="backlog">Maximum amount of pending connection</param>
        public void Listen(int backlog)
        {
            Connection.Listen(backlog);
            SocketAsyncEventArgs e = new SocketAsyncEventArgs();
            e.Completed += AcceptCompleted;
            bool willRaiseEvent = Connection.AcceptAsync(e);
            if (!willRaiseEvent)
                ProcessAccept(e);
        }

        /// <summary>
        /// Action invoked when an accept operation has finished asynchronously
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void AcceptCompleted(object sender, SocketAsyncEventArgs e)
        {
            ProcessAccept(e);
        }

        /// <summary>
        /// Action raised when an accept operation has finished
        /// </summary>
        /// <param name="e"></param>
        private void ProcessAccept(SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.Success)
            {
                AsyncClient<TUserToken> newClient = new AsyncClient<TUserToken>(e.AcceptSocket, this);
                base.Call_SocketConnected(newClient);
                newClient.StartReceiving();
            }
            e.AcceptSocket = null;
            bool willRaiseEvent = Connection.AcceptAsync(e);
            if (!willRaiseEvent)
                ProcessAccept(e);
        }
    }
AsyncClient.cs
Code:
    /// <summary>
    /// Wraps a System.Net.Sockets.Socket instance to provide an easy way
    /// to send/receive data asynchronously using SocketAsyncEventArgs
    /// </summary>
    public class AsyncClient<TUserToken> : SocketEvents<TUserToken>
	{
        /// <summary>
        /// Pool of SocketAsyncEventArgs to use on the socket Input/Output (data transfer) 
        /// actions
        /// </summary>
        private static SocketAsyncEventArgsPool IOEventsPool = new SocketAsyncEventArgsPool();

        /// <summary>
        /// To avoid double Disconnect calls
        /// </summary>
        private bool _disconnected = false;

        /// <summary>
        /// SocketAsyncEventArgs used to receive data asynchronously, only one instance must
        /// be used for receiving, must be returned to the IOEventsPool when the Socket
        /// has been disconnected
        /// </summary>
        private SocketAsyncEventArgs _receiveEvent;

        /// <summary>
        /// The server that created the this AsyncClient instance, inherits the BufferPool
        /// class so one main buffer is kept per server
        /// </summary>
        private AsyncServer<TUserToken> _ownerServer;

        /// <summary>
        /// The segment that the receiveEvent is associated with
        /// </summary>
        private BufferSegment<byte> _bufferSegment;

        /// <summary>
        /// Stream where the packets received are written
        /// </summary>
        public CircularStream DataStream;

        /// <summary>
        /// Gets the Socket wrapped by this AsyncClient instance
        /// </summary>
        public Socket Connection { get; private set; }

        /// <summary>
        /// Gets or sets the object that is used to recognize this AsyncClient instance
        /// </summary>
        public TUserToken Owner;
        
        /// <summary>
        /// Wraps a System.Net.Sockets.Socket instance to provide an easy way
        /// to send/receive data asynchronously using SocketAsyncEventArgs
        /// </summary>
        /// <param name="socket">Socket that will be used for sending/receiving data</param>
        /// <param name="server">AsyncServer where this AsyncClient instance was created</param>
        public AsyncClient(Socket socket, AsyncServer<TUserToken> server)
        {
            server.CopyTo(this);
            this.Connection = socket;
            this._ownerServer = server;
        }

        /// <summary>
        /// Signals the Socket to start receiving packets
        /// </summary>
        public void StartReceiving()
        {
            if (_receiveEvent == null)
            {
                if (!IOEventsPool.TryPop(out _receiveEvent))
                {
                    _receiveEvent = new SocketAsyncEventArgs();
                }
                _bufferSegment = _ownerServer.BufferPool.GetSegment();
                _receiveEvent.Completed += Completed;
                _receiveEvent.SetBuffer(_ownerServer.BufferPool.MainBuffer, _bufferSegment.Index, _bufferSegment.Length);
                bool willRaiseEvent = Connection.ReceiveAsync(_receiveEvent);
                if (!willRaiseEvent)
                    ProcessReceive(_receiveEvent);
            }
        }

        /// <summary>
        /// Changes the maximum length of the DataReceived stream, if there are
        /// data in the stream will be copied to a new stream
        /// </summary>
        /// <param name="length"></param>
        public void SetMaxDataStreamLength(int length)
        {
            CircularStream newStream = new CircularStream(length);
            if (DataStream != null)
            {
                byte[] oldBuffer = new byte[DataStream.Length];
                DataStream.Read(oldBuffer, 0, oldBuffer.Length);
                newStream.Write(oldBuffer, 0, (int)Math.Min(oldBuffer.Length, newStream.Length));
            }
            DataStream = newStream;
        }

        /// <summary>
        /// Sends a byte array asynchronously using SocketAsyncEventArgs
        /// </summary>
        /// <param name="buffer">Data that will be sent</param>
        public void Send(byte[] buffer)
        {
            SocketAsyncEventArgs e;
            if (!IOEventsPool.TryPop(out e))
            {
                e = new SocketAsyncEventArgs();
            }
            e.Completed += Completed;
            BufferSegment<byte> segment = _ownerServer.BufferPool.GetSegment();
            e.UserToken = segment ;
            e.SetBuffer(segment.Owner.MainBuffer, segment.Index, buffer.Length);
            Buffer.BlockCopy(buffer, 0, e.Buffer, segment.Index, buffer.Length);
            bool willRaiseEvent = Connection.SendAsync(e);
            if (!willRaiseEvent)
                ProcessSend(e);
        }

        /// <summary>
        /// Action invoked when a SendAsync operation has finished
        /// </summary>
        /// <param name="e"></param>
        private void ProcessSend(SocketAsyncEventArgs e)
        {
            BufferSegment<byte> segment = e.UserToken as BufferSegment<byte>;
            segment.Free();
            e.SetBuffer(null, 0, 0);
            e.Completed -= Completed;
            IOEventsPool.Push(e);
            
        }

        /// <summary>
        /// Action invoked when an Input/Output operation has finished asynchronously
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Completed(object sender, SocketAsyncEventArgs e)
        {
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
            }
        }

        /// <summary>
        /// Action invoked when a ReceiveAsync operation has finished
        /// </summary>
        /// <param name="e"></param>
        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.Success && e.BytesTransferred > 0 && Connection.Connected)
            {
                try
                {
                    DataStream.Write(e.Buffer, _bufferSegment.Index, e.BytesTransferred);
                    base.Call_DataReceived(this, e.Buffer, e.BytesTransferred);
                    bool willRaiseEvent = Connection.ReceiveAsync(e);
                    if (!willRaiseEvent)
                        ProcessReceive(e);
                }
                catch { Disconnect(); }
            }
            else
            {
                Disconnect();
            }
        }

        /// <summary>
        /// Disconnects the Socket wrapped and raises the Events.SocketDisconnected action
        /// </summary>
        private void Disconnect()
        {
            if (!_disconnected)
            {
                _disconnected = true;
                try
                {
                    _receiveEvent.SetBuffer(null, 0, 0);
                    _receiveEvent.Completed -= Completed;
                    IOEventsPool.Push(_receiveEvent);
                    _bufferSegment.Free();
                    Connection.Close();
                    Connection.Dispose();
                    DataStream.Dispose();
                }
                finally
                {
                    Call_SocketDisconnected(this);
                }
            }
        }
    }
SocketEvents.cs
Code:
    public delegate void SocketConnectionEventHandler<TClient>(AsyncClient<TClient> sender);
    public delegate void SocketTransferEventHandler<TClient>(AsyncClient<TClient> sender, byte[] data, int length);

    /// <summary>
    /// Contains the delegates invoked when a socket is accepted, disconnected or it
    /// has received data
    /// </summary>
    /// <typeparam name="TUserToken"></typeparam>
    public class SocketEvents<TUserToken>
    {
        /// <summary>
        /// Notifies to teh server that a new socket has been accepted.
        /// </summary>
        public event SocketConnectionEventHandler<TUserToken> SocketConnected;

        /// <summary>
        /// Notifies to the server that a client has been disconnected.
        /// </summary>
        public event SocketConnectionEventHandler<TUserToken> SocketDisconnected;

        /// <summary>
        /// Notifies to the server that a client has received data, then the data will
        /// be writed to the DataStream, use this event to implement a cipher.
        /// </summary>
        public event SocketTransferEventHandler<TUserToken> DataReceived;

        /// <summary>
        /// Copy all the delegates from the current instance to another SocketEvents instance
        /// </summary>
        /// <param name="target">SocketEvenst instance where the delegates will be copied</param>
        public void CopyTo(SocketEvents<TUserToken> target)
        {
            target.SocketDisconnected = SocketDisconnected;
            target.DataReceived = DataReceived;
            target.SocketConnected = SocketConnected;
        }

        protected void Call_SocketConnected(AsyncClient<TUserToken> client)
        {
            if (SocketConnected != null)
                SocketConnected(client);
        }

        protected void Call_SocketDisconnected(AsyncClient<TUserToken> client)
        {
            if (SocketDisconnected != null)
                SocketDisconnected(client);
        }

        protected void Call_DataReceived(AsyncClient<TUserToken> client, byte[] data, int amount)
        {
            if (DataReceived != null)
                DataReceived(client, data, amount);
        }
    }
Using the code


I'm lazy now so here you have an example, it's quite a bad example but it worked :P

Code:
            AsyncServer<int> server = new AsyncServer<int>(0, 1024, AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            server.Events.SocketConnected = Connected;
            server.Events.SocketDisconnected = Disconnected;
            server.Events.DataReceived = Received;
            server.Bind(9958);
            server.Listen(100);
the methods Connect, Disconnected and Received:

Code:
        static void Connected(AsyncClient<int> client)
        {
            Console.WriteLine("connected");
            client.SetMaxDataStreamLength(1024); // Initializes the DataStream for the client
            // Here, i assume that NEVER, there will be more than 1024 bytes that must be processed
            // A big enough number must be used when using this socket server with apps like conquer
            // Where packet processing migth become slow
        }
 
        static void Received(AsyncClient<int> client)
        {
            while (client.DataStream.Length > 0)
            {
                if (client.DataStream.Length >= 2) // assuming that the first 2 bytes are the total size
                {
                    short size = client.DataStream.ReadInt16();
                    if (client.DataStream.Length >= size - 2)
                    {
                        byte[] bytes = new byte[size - 2];
                        client.DataStream.Read(bytes, 0, size - 2);
                        Console.WriteLine(Encoding.ASCII.GetString(bytes));
                    }
                    else break; //Size should be queued so on next notification that size will be checked
                }
                else break;
            }
        }
 
        static void Disconnected(AsyncClient<int> client)
        {
            Console.WriteLine("DC!");
        }


To finish, i haven't fully tested this yet so some errors may happen, nor i have fully coded it yet, wanted to implement more Read functions to the CircularStream, but meh i'm lazy

Forgot to say, there is an extra parameter in the AsyncServer constructor (footer length), wanted to remove the "TQClient" seal before writing the packet to the stream, but didn't find any reason to do it.
U2_Caparzo is offline  
Thanks
3 Users
Old 05/28/2014, 10:24   #2
 
InfamousNoone's Avatar
 
elite*gold: 20
Join Date: Jan 2008
Posts: 2,012
Received Thanks: 2,882
i skimmed this, things to consider:
use async ctp for a better cleaner read/code
use 'event' keyword for events (Connected, Disconnected, etc)
InfamousNoone is offline  
Thanks
1 User
Old 05/28/2014, 11:48   #3


 
Korvacs's Avatar
 
elite*gold: 20
Join Date: Mar 2006
Posts: 6,125
Received Thanks: 2,518
An interesting read, let us know how it performs .
Korvacs is offline  
Thanks
1 User
Old 05/28/2014, 12:13   #4
 
U2_Caparzo's Avatar
 
elite*gold: 0
Join Date: Aug 2011
Posts: 314
Received Thanks: 90
Quote:
Originally Posted by InfamousNoone View Post
i skimmed this, things to consider:
Thanks for replying (was waiting for a comment )

Quote:
Originally Posted by InfamousNoone View Post
use async ctp for a better cleaner read/code
Had to search about the async ctp , is that the async/await keywords, right? if that is the case... :'( , got a very, very old netbook that barely runs VS 2010 so i don't know nothing about it, sorry

Quote:
Originally Posted by InfamousNoone View Post
use 'event' keyword for events (Connected, Disconnected, etc)
i surely will, thinking actually in make the AsyncServer/Client classes inherit the SocketEvents and provide a protected method to invoke the events.

yet i would like to know if the circular stream concept/idea is correct, is the more experimental code for me in this thread since i haven't seen anything similar before, i remember have read somewhere else about the same but not even sure if that was about sockets tho.
For me it looks good, but there are many members here with much more programming experience than me, it's always better to have others people opinion

Quote:
Originally Posted by Korvacs View Post
An interesting read, let us know how it performs .
Thanks for replying, i still have to test and hopefully stress it (not that hard when u got an intel atom processor i think ), will try to do it today, but it's 6:20 am here and won't find people to help with that (don't know other way for test it :/)
U2_Caparzo is offline  
Old 05/28/2014, 12:20   #5
 
elite*gold: 0
Join Date: May 2014
Posts: 25
Received Thanks: 12
Looks nicely done, however the naming conventions are a little messy, nothing you can't fix by renaming variables though.

According to my own experience its performing better if you have a separate buffer for receiving and sending, you might want to look into that.
Logic* is offline  
Thanks
1 User
Old 05/28/2014, 12:39   #6
 
U2_Caparzo's Avatar
 
elite*gold: 0
Join Date: Aug 2011
Posts: 314
Received Thanks: 90
Quote:
Originally Posted by Logic* View Post
Looks nicely done, however the naming conventions are a little messy, nothing you can't fix by renaming variables though.

According to my own experience its performing better if you have a separate buffer for receiving and sending, you might want to look into that.
Not really good with that

Nice to hear someone who have used something similar before, i'll have your suggestion in mind when testing, for the design i will have to edit the code more than i would want ^^
U2_Caparzo is offline  
Old 05/28/2014, 15:21   #7
 
elite*gold: 0
Join Date: Sep 2013
Posts: 197
Received Thanks: 140
From what I understood, the biggest benefit of using SocketAsyncEventArgs for your server is that is allows you to avoid all of the allocation that's associated with the "regular" async socket pattern (Begin/Endxxx).

I don't think you're going to see any truly noticable performance improvements though, unless you're stress testing your server with several thousand simultaneous connections of course.

Still, nice work though! It definitely takes a lot more time and work getting the SocketAsyncEventArgs right compared to the normal async socket pattern, so it's nice to see a guide like this.
SteveRambo is offline  
Thanks
1 User
Old 05/28/2014, 16:57   #8
 
U2_Caparzo's Avatar
 
elite*gold: 0
Join Date: Aug 2011
Posts: 314
Received Thanks: 90
Quote:
Originally Posted by SteveRambo View Post
From what I understood, the biggest benefit of using SocketAsyncEventArgs for your server is that is allows you to avoid all of the allocation that's associated with the "regular" async socket pattern (Begin/Endxxx).

I don't think you're going to see any truly noticable performance improvements though, unless you're stress testing your server with several thousand simultaneous connections of course.

Still, nice work though! It definitely takes a lot more time and work getting the SocketAsyncEventArgs right compared to the normal async socket pattern, so it's nice to see a guide like this.
Thanks, yeah on a conquer server (after all this forum is about conquer ^^) i doubt that there will be any noticeable difference, SAEA targets mainly to avoid allocation of new objects and memory fragmentation, never heard about problems with that here yet it's not useless nor boring to code it and the features that it can implement so here i'm

Still need to update the code, there are some dumb errors and a possible memory leak when the amount of concurrent connection increases (just detected a synchronization error on the buffer pool, if 2+ threads calls GetSegment and the queue is empty, each thread will invoke the Resize method and thus the memory increases a lot! )
U2_Caparzo is offline  
Thanks
1 User
Old 05/29/2014, 00:20   #9
 
elite*gold: 0
Join Date: Oct 2009
Posts: 768
Received Thanks: 550
I use something like this myself but you might want to consider creating the buffer pool with something like 1000 * subbuffer size (enough for 1,000 clients). In the case of a CO server you might never use that much but, if you do, you won't need more than 2 resizes afterwards. Creating the buffer pool like that wouldn't be an issue nowadays since you can afford 1 MB without any issues.

By the way, you might not want to handle the receive event on the thread that you get the socket event. They are i/o threads and it might be useful to handle the packets on a separate thread pool.
I use a threadpool to handle the send as well, that way I don't have to wait a lot when locking to add in a queue for send.

The code looks nice and the use of the Stream base class is a nice idea, but you might want to implement something like PeekUInt16() to check for the size so that when you extract the packet it's the whole thing.
-impulse- is offline  
Thanks
1 User
Old 05/30/2014, 02:21   #10
 
U2_Caparzo's Avatar
 
elite*gold: 0
Join Date: Aug 2011
Posts: 314
Received Thanks: 90
Quote:
Originally Posted by -impulse- View Post
I use something like this myself but you might want to consider creating the buffer pool with something like 1000 * subbuffer size (enough for 1,000 clients). In the case of a CO server you might never use that much but, if you do, you won't need more than 2 resizes afterwards. Creating the buffer pool like that wouldn't be an issue nowadays since you can afford 1 MB without any issues.
Yeah, after all that is how it's supposed to work, don't know why didn't that

Quote:
Originally Posted by -impulse- View Post
By the way, you might not want to handle the receive event on the thread that you get the socket event. They are i/o threads and it might be useful to handle the packets on a separate thread pool.
it's just a notifier like any other, instead using it for processing i would create separate threads and read data from the stream and process it in these threads.

Quote:
Originally Posted by -impulse- View Post
I use a threadpool to handle the send as well, that way I don't have to wait a lot when locking to add in a queue for send.
I got a bit lost in this part, what i do for sending is poping a pooled SAEA, get a buffer segment send it, and after that push the SAEA to the pool and free the segment, here i'm facing an issue, i thought it was because of lack of sync. when resising the buffer, but i was wrong, tested with a local client program sending every 3 ms 20 bytes to the server, with 200 connections everything is going well for about 20 or 30 seconds, after that it looks like not all the buffers are being freed and the buffer start resizing very very quickly, it's the only issue that i'm having right now, i was thinking in implement something like another stream for sending and create another SAEA for sending, i'd love that you could explain me how your "Send" method works

Quote:
Originally Posted by -impulse- View Post
The code looks nice and the use of the Stream base class is a nice idea, but you might want to implement something like PeekUInt16() to check for the size so that when you extract the packet it's the whole thing.
was thinking exactly that after testing for packet fragmentation issues ^^

Thanks for replying, quite useful feedback
U2_Caparzo is offline  
Old 05/30/2014, 10:24   #11
 
elite*gold: 0
Join Date: Oct 2009
Posts: 768
Received Thanks: 550
Quote:
Originally Posted by U2_Caparzo View Post
I got a bit lost in this part, what i do for sending is poping a pooled SAEA, get a buffer segment send it, and after that push the SAEA to the pool and free the segment, here i'm facing an issue, i thought it was because of lack of sync. when resising the buffer, but i was wrong, tested with a local client program sending every 3 ms 20 bytes to the server, with 200 connections everything is going well for about 20 or 30 seconds, after that it looks like not all the buffers are being freed and the buffer start resizing very very quickly, it's the only issue that i'm having right now, i was thinking in implement something like another stream for sending and create another SAEA for sending, i'd love that you could explain me how your "Send" method works
I noticed this a while back... for some reason async send doesn't comeplete directly even though it's supposed to be just a push. Back when I was working on Trinity calling BeginSend() would take even up to 17 seconds and I have no idea why. SendAsync works a bit better that way but to make sure I don't run in the same issue I enqueue whatever I have to send in a queue then on another thread I glue as many packets as I can and send them (I have NoDelay enabled - not recommended unless you handle the glue part yourself or you have only big packets).

Like this the send is quite fast but I believe that the issue with the async send still happens: Right now on a server with 410 players online there are 416 receive buffers used at the same time and only about 7 send buffers (including SAEAs) - those are somewhat stuck in the process of receiving/sending.
-impulse- is offline  
Thanks
1 User
Reply

Tags
c#, server, socket, socketasynceventargs


Similar Threads Similar Threads
[C#]Async Socket
01/09/2014 - CO2 PServer Guides & Releases - 2 Replies
Hey , this is my first test of coding a socket :D ServerSocket - iPiraTe - Pastebin.com Notice : i didn't finished it yet .. i wanted to know ur opinion first
A look at async/await and its applications
08/08/2013 - CO2 Programming - 6 Replies
I'm pretty sure most programmers here are still stuck using the legacy async methods (Begin..., End...) instead of the new keywords, so here's a small overview of using them. You can quickly see how applicable they are to development and how useful it can be. Emphasis on "small overview". It also becomes extremely apparent how much cleaner your code becomes using async/await rather than the legacy methods which forced you to write spaghetti-like code.
[Release]Async Sockets (No packet-splitter)
09/20/2012 - CO2 Programming - 16 Replies
So after seeing this thread: http://www.elitepvpers.com/forum/co2-pserver-discu ssions-questions/2129268-problem-packet-splitting. html I thought I'd make some socket server that could handle the packets without needing to split. Source: BasicClient.cs using System;
[Release] SocketSystem using SocketAsyncEventArgs
06/11/2012 - CO2 PServer Guides & Releases - 19 Replies
Its just a base, you can implement it on MANY ways.... I removed my implementations and let just the working base, The socket works very well... and i tested it for some good time... As example i created my self, a account server .... for version 4351 (it still work to 5095 versions tho....) Good luck, and i'd like some good feedbacks .... EDIT: Some advices for implementations would be creating a buffermanager and the socketasynceventargspool to reuse the eventargs, that increases...
Async Socket Wrapper
11/11/2011 - CO2 Programming - 8 Replies
So I was bored and thought I would code a socket wrapper. It's a class library, but full project is available for download + an example use for a socket server. It contains both wrapper for server and client stuff, which means it can be used for either private servers as server-socket or for a proxy. WinAsync - This class contains all the events. public delegate void WinEvent(WinClient wClient); public delegate void WinBufferEvent(WinClient wClient, WinBuffer Buffer); ...



All times are GMT +2. The time now is 03:39.


Powered by vBulletin®
Copyright ©2000 - 2024, Jelsoft Enterprises Ltd.
SEO by vBSEO ©2011, Crawlability, Inc.
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

Support | Contact Us | FAQ | Advertising | Privacy Policy | Terms of Service | Abuse
Copyright ©2024 elitepvpers All Rights Reserved.