Creating a thread save queue

We’ve been working on a WPF project for a costumer some time now. One of the requirements of the application was the storage and live streaming of multiple network cameras.

Due to the project setup we decided to split the storage and the viewing part. A client application will stream the camera’s at a rate of 25 frames per second and a server application will record the frames at a rate of 2 frames a second. The camera’s (Mobotix and Axis) can handle the 2 streams and also the network capacity is high enough to create 2 different streams to the cameras.

For the server application I started with implementing the AForge framework (http://www.aforgenet.com/) to handle the camera streams. The Aforge Library has excellent support for audio and video streaming/editing. In our server application we only make use of the AForge.Video namespace. Setting up a stream to a network camera is quite simple.

public CameraStream(string ipAddress, int frameRate)
{
	string url = string.Format(@"http://{0}/control/faststream.jpg?stream=full&fps={1}", ipAddress, frameRate);
	MJPEGStream mjpegStream = new MJPEGStream(url);
	mjpegStream.NewFrame += VideoNewFrame;
	mjpegStream.VideoSourceError += VideoSourceError;
}

private void VideoNewFrame(object sender, NewFrameEventArgs eventArgs)
{
	//Add implementation
	throw new NotImplementedException();
}

private void VideoSourceError(object sender, VideoSourceErrorEventArgs eventArgs)
{
	//Add implementation
	throw new NotImplementedException();
}

Create a MJPEGStream with the URL of the camera as parameter. Attach 2 event handlers, one when a new frame is received, one if an error occurs. The error handling is not part of this post where we focus on asynchronously and thread save usage of a queue. Next part is to implement the event handler when a new frame is received. In the event argument a bitmap is exposed as current frame. We can save this bitmap to disk.

private void VideoNewFrame(object sender, NewFrameEventArgs eventArgs)
{
	//Get the bitmap from the stream
	Bitmap bitmap = eventArgs.Frame;

	//Set the path where we want to save the image
	string path = string.Format(@"d:\Temp\{0}.jpg",DateTime.Now.ToString());
	
	//Save the image
	bitmap.Save(path);
}

This seems allright and if we test with low number of cameras it seems to be running just fine. If we deployed the application to the test environment with 24 cameras we noticed the camera stream stopped every minute for a few seconds. After some debugging we found that the disk where the file was saved had some lack and blocked the saving part of our code for 20 – 30 seconds. Due to our implementation to save directly in the event handler all streams were halted and no images got recorded.

Up to asynchronously saving of the images. If access to the disk was blocked for a reason the image streams have to keep on streaming and the images need to be put in a queue. When the disk is writable again we can empty the queue and start saving to the disk. We first created a helper class to hold the images and to manage the queue. First implementation was a simple list that could hold objects with as parameters a Bitmap and a string (the image and the path where it has to be saved).

public class SaveImageQueue
{
	public List<ImageToSave> Queue { get; set; }
	private Object thisLock = new Object();

	/// <summary>
	/// Constructor
	/// </summary>
	public SaveImageQueue()
	{
		Queue = new List<ImageToSave>();
	}
	
	public void AddImage(Bitmap image, string path)
	{
		lock(thisLock)
		{
			Queue.Add(new ImageToSave(){FilePath = path,Bitmap=image});
		}
	}
	
	public ImageToSave GetImage()
	{
		lock(thisLock)
		{
			return Queue.FirstOrDefault();
		}
	}
}

/// <summary>
/// Helper class to add images and their filepath in the queue
/// </summary>
public class ImageToSave:IDisposable
{
	public string FilePath { get; set; }
	public Bitmap Bitmap { get; set; }

	/// <summary>
	/// Disposes the Bitmap in the helper class
	/// </summary>
	public void Dispose()
	{
		Bitmap.Dispose();
	}
}

In the code above you can see we use a lock object to be sure only one thread at the time can access the List<T>. In the new frame event handler we can just call the AddImage method to add an image to the queue. In a separate thread we will consume the GetImage method to fetch the first image from the queue and when it’s not null we can save it to disk.

If we put this to test we could notice almost directly that the List<T> got locked to much by the 24 camera streams and the consumer to save the images. Our simple solution wasn’t up to this load. When searching for a solution we stumbled upon the white paper “Thread-safe Collections in .NET Framework 4 and Their Performance Characteristics” you can find in this blog post. From the 4.0 version .Net has his built in thread-safe collections that perfect fit our needs. We dropped our own written lock system and added the BlockingCollection from the System.Collections.Concurrent namespace.


/// <summary>
/// Queue to async save images to disk
/// </summary>
public class SaveImageQueue
{
	public BlockingCollection<ImageToSave> Queue { get; set; }

	/// <summary>
	/// Constructor
	/// </summary>
	public SaveImageQueue()
	{
		Queue = new BlockingCollection<ImageToSave>();
	}

	/// <summary>
	/// Start the queue handling. Will check the queue and then saves the images one by one.
	/// </summary>
	public void StartQueue()
	{
		Task.Factory.StartNew(() =>
								  {
									  while (true)
									  {
										  ImageToSave imageToSave = null;
										  if (Queue.TryTake(out imageToSave))
										  {
											  Log.DebugFormat("1: Saving image from queue to {0}", imageToSave.FilePath);
											  try
											  {
												  imageToSave.Bitmap.Save(imageToSave.FilePath);
												  imageToSave.Dispose();
												  Log.DebugFormat("1: Queue is holding {0} images", Queue.Count);
											  }
											  catch (Exception ex)
											  {
												  Log.Error("1: Error reading and executing queue", ex);
											  }
										  }
									  }
								  });
}

/// <summary>
/// Helper class to add images and their filepath in the queue
/// </summary>
public class ImageToSave:IDisposable
{
	public string FilePath { get; set; }
	public Bitmap Bitmap { get; set; }

	#region Implementation of IDisposable

	/// <summary>
	/// Disposes the Bitmap in the helper class
	/// </summary>
	public void Dispose()
	{
		Bitmap.Dispose();
	}

	#endregion
}

As you can see we can now use the TryTake method to fetch an object from the collection and to save the image. The BlockingCollection will take care of the locking of the object and if we want we can add multiple consumers so the queue will be emptied faster. All this is put is a asynchronous Task so the camera streams will not notice anything from the saving of the Bitmap.

In the camera new frame event we can add the committing of the image to the queue. One note here is that you best add a clone of the bitmap you extracted from the event arguments. If the queue is emptied rapidly and you don’t use a clone of the image you’ll get GDI exceptions all around the place due to concurring accessing the same memory address space.

private void SaveImage(string filePath, Bitmap bitmap)
{
	queue.Queue.Add(new ImageToSave() { Bitmap = (Bitmap)bitmap.Clone(), FilePath = filePath });
	bitmap.Dispose();
}

Leave a Reply

Your email address will not be published. Required fields are marked *