Friday, September 14, 2012

Many Files Concurrent Read/Write

As part of an optimization I needed to do to a heavily loaded application I've searched for a way to read and write a lot of small to medium sized files, it was one more serious bottleneck which good caching alleviated but did not eliminate due to the amount of the files going out of cache.

So I've thought about 3 ways of accessing files and went ahead to see which one is faster, the first one will attempt to read/write with exception handling, the second one will use a queue with worker threads and the third one will avoid trying to access the files if they exist in a read/write dictionary (multiple reads are allowed but only one write).

At a later stage I've added the dictionary check to the queue, to make it more efficient, if a specific file is writing, the 2nd attempt will be re-queued so other files can be cleared out of the queue.

The results are files per second, for 50k files, 1000 random iterations on 100  files range averaged on 10 executions



ActionThreadsOS LockingQueued 1Queued 2Avoid
Read19775 1000 990 10193
Read531887 15019 14560 31585
Read1038639 15669 1066036886
Write13017 1022 993 2791
Write55532 4857 4766 5413
Write105378 4866 4625 5339
Read/Write14504 1050 995 5173
Read/Write510477 6934 7711 10324
Read/Write109742 7867 7824 10773

The results are pretty straight forward, OS Locking uses exceptions for handling collisions, it makes it slower in one thread, but it is overall faster because there are no overheads from other elements.


I've written two queued classes, one of them is invoking a method on each queue item, the other is in a continuous loop, the continuous loop is faster.

Overall it seems that avoiding an OS file access is the same as the exception for attempting to access the same file except when the file operation takes a long time, this is probably due to the fact that the method will keep trying every 1 ms to access the file in case of "file in use" exception, wasting CPU instead of waiting peacefully for the file to be unlocked.

Its been an interesting test for me, I've thought that the queues will do a much faster job as the OS will need only to handle writing and reading files sequentially, I was wrong and it was good to find out.

I'm including all the source code for the tests, I have a feeling the DictionaryLock and the QueuedExecution will find a better use in the future.

The DictionaryLock uses ConcurrentDictionary and a SpinLock to perform a Read/Write lock on keys, the spinlock is there so only one thread will be able to insert new locks into the dictionary.


/// <summary>
/// Dictionary of locks on TKey
/// </summary>
/// <typeparam name="TKey">Type of key</typeparam>
public class DictionaryLock<TKey>
{
    /// <summary>
    /// Dictionary of locks container
    /// </summary>
    private ConcurrentDictionary<TKey, ReaderWriterLockSlim> _locks = new ConcurrentDictionary<TKey, ReaderWriterLockSlim>();

    /// <summary>
    /// _locks updating lock
    /// </summary>
    private SpinLock _operationlock = new SpinLock();

    /// <summary>
    /// Retrieves the ReaderWriterLock for a specific key
    /// </summary>
    private ReaderWriterLockSlim GetLock(TKey key)
    {
        //check if lock exist
        ReaderWriterLockSlim localock;
        if (_locks.TryGetValue(key, out localock))
        {
            return localock;
        }

        //it doesn't exist, lets create it

        bool lockTaken = false;
        _operationlock.Enter(ref lockTaken);

        //after acquired write lock, recheck its not in the dictionary if two writes were attempted for the same key
        if (!_locks.TryGetValue(key, out localock))
        {
            localock = new ReaderWriterLockSlim();
            _locks[key] = localock;
        }
        _operationlock.Exit();

        return localock;
    }


    /// <summary>
    /// Enter Reader lock on key
    /// </summary>
    public void EnterReader(TKey key)
    {
        var localock = GetLock(key);

        localock.EnterReadLock();
    }

    /// <summary>
    /// Enter Writer lock on key
    /// </summary>
    public void EnterWriter(TKey key)
    {
        var localock = GetLock(key);

        localock.EnterWriteLock();
    }

    /// <summary>
    /// Check Reader locked on key
    /// </summary>
    public bool IsReaderLocked(TKey key)
    {
        ReaderWriterLockSlim localock;
        if (_locks.TryGetValue(key, out localock))
            return localock.IsReadLockHeld;
        return false;
    }

    /// <summary>
    /// Check Writer locked on key
    /// </summary>
    public bool IsWriterLocked(TKey key)
    {
        ReaderWriterLockSlim localock;
        if (_locks.TryGetValue(key, out localock))
            return localock.IsWriteLockHeld;
        return false;
    }

    /// <summary>
    /// Exit Reader lock on key
    /// </summary>
    public void ExitReader(TKey key)
    {
        ReaderWriterLockSlim localock;
        if (_locks.TryGetValue(key, out localock))
            localock.ExitReadLock();
    }

    /// <summary>
    /// Exit Writer lock on key
    /// </summary>
    public void ExitWriter(TKey key)
    {
        ReaderWriterLockSlim localock;
        if (_locks.TryGetValue(key, out localock))
            localock.ExitWriteLock();
    }
}


The QueuedExecution is an abstract class providing an easy way to implement queued object handling, it uses the ManualResetEventSlim to notify the caller its done processing the request, it could use a better exception handling, I've done the minimum for this test project.


/// <summary>
/// Abstract Queued Execution
/// <para>Provides infrastructure for executing IItems with ProcessQueue override
/// in a number of threads in defined in the constructor</para>
/// </summary>
public abstract class QueuedExecution : IDisposable
{
    /// <summary>
    /// Process Result, returned by ProcessQueue
    /// </summary>
    protected enum ProcessResult
    {
        Success,
        FailThrow,
        FailRequeue
    }

    /// <summary>
    /// Item interface
    /// </summary>
    protected interface IItem {}

    /// <summary>
    /// Queue Item container
    /// </summary>
    private class QueueItem
    {
        /// <summary>
        /// IItem
        /// </summary>
        public IItem Item { get; set; }

        /// <summary>
        /// Result of ProcessQueue
        /// </summary>
        public ProcessResult ProcessResult { get; set; }

        /// <summary>
        /// ManualResetEvent for pinging back the waiting call
        /// </summary>
        public ManualResetEventSlim resetEvent { get; set; } 
    }

    /// <summary>
    /// Queue containing all the items for execution
    /// </summary>
    private ConcurrentQueue<QueueItem> _queue = new ConcurrentQueue<QueueItem>();

    /// <summary>
    /// Process Queue method, should be overriden in inheriting class
    /// </summary>
    /// <param name="item">item to be executed against</param>
    /// <returns>success/fail/requeue</returns>
    protected abstract ProcessResult ProcessQueue(IItem item);

    /// <summary>
    /// Number of threads to process queue
    /// </summary>
    private int _threadcount = 1;

    /// <summary>
    /// Threads array
    /// </summary>
    private Thread[] _threads;

    /// <summary>
    /// flag, should abort all executing threads
    /// </summary>
    private bool _threadaborted = false;

    /// <summary>
    /// Initializes the threads for execution
    /// </summary>
    private void Initialize()
    {
        _threads = new Thread[_threadcount];
        for (var i = 0; i < _threadcount; i++)
            _threads[i] = new Thread(new ThreadStart(() =>
                {
                    do
                    {
                        QueueItem item;
                        if (_queue.TryDequeue(out item))
                        {
                            item.ProcessResult = ProcessQueue(item.Item);

                            if (item.ProcessResult == ProcessResult.FailRequeue)
                            {
                                _queue.Enqueue(item);
                                continue;
                            }

                            item.resetEvent.Set();
                        }
                        else
                        {
                            Thread.Sleep(1);
                        }
                    } while (!_threadaborted);
                }));
        for (var i = 0; i < _threadcount; i++)
            _threads[i].Start();
    }

    protected QueuedExecution(int threads)
    {
        _threadcount = threads;
        Initialize();
    }

    /// <summary>
    /// Execute call in queue, block until processed
    /// </summary>
    /// <param name="item"></param>
    protected void Execute(IItem item)
    {
        var resetevent = new ManualResetEventSlim();
        var qi = new QueueItem
            {
                Item = item,
                resetEvent = resetevent
            };
        _queue.Enqueue(qi);
        resetevent.Wait();

        if (qi.ProcessResult == ProcessResult.FailThrow)
            throw new Exception("execution failed");
    }

    #region IDisposable Members

    /// <summary>
    /// cleanup
    /// </summary>
    /// <param name="waitForFinish">should wait for process to finish 
    /// currently executing request or abort immediately</param>
    /// <param name="wait">time to wait for abort to finish</param>
    public void Dispose(bool waitForFinish,TimeSpan wait)
    {
        _threadaborted = true;
         bool allaborted = true;
        if (waitForFinish)
        {
            //wait for timeout, check if threads aborted gracefully in that time
            while ((DateTime.Now + wait) > DateTime.Now)
            {
                allaborted = true;
                foreach (var t in _threads)
                {
                    if (t.IsAlive == true)
                    {
                        allaborted = false;
                        break;
                    }
                }
                if (allaborted == true)
                    break;

                Thread.Sleep(1);
            }
        }

        //if not all threads were aborted, abort them
        if (allaborted == false)
        {
            foreach (var t in _threads)
                if (t.IsAlive)
                    t.Abort();
        }
    }

    public void Dispose()
    {
        Dispose(false,TimeSpan.MinValue);
    }

    #endregion
}


You can find the source code here:
https://github.com/drorgl/ForBlog/tree/master/FileCollisionTests

No comments:

Post a Comment