browse by category or date

The motivation of this project is simple, I want to familiarize myself with System.Collections.Concurrent namespace. This project is a console application which will search files for a specified text. User can specify a number of parameters:

  1. text to be searched
  2. extension of the files to be searched (default is any extension. For simplicity, only accept one extension, e.g. .cs)
  3. starting location (default is current directory) inclusive of all sub-directories under it
  4. how many threads to use for searching (default is 8 threads)

The program will start two threads. A producer and a consumer. Producer will traverse the directory, pick the suitable files and put them into the queue. Consumer will spawn a number of worker threads. Each worker thread will then take one file from the queue, and searches its content. If matches, it will put the filename into a collection.

Since many threads will access the queue, we must use a thread-safe queue type. Hence, we use ConcurrentQueue.

To easily parse the command line parameters, CommandLineParse is my go to package.

Install-Package CommandLineParser -Version 2.8.0

We then specify how the command line parameters are structured:

        public class Options
        {
            private int _maxThread = 8;

            [Option('e', "extension", Required = false, HelpText = "File extension, default: .*")]
            public string Extensions { get; set; }

            [Option('l', "location", Required = false, HelpText = "Starting location, default: current directory")]
            public string Location { get; set; }

            [Option('s', "search", Required = true, HelpText = "String to be searched")]
            public string Search { get; set; }

            [Option('m', "maxthread", Required = false, HelpText = "Max number of process thread, default: 8")]
            public int MaxThreads
            {
                get { return _maxThread; }
                set { _maxThread = value; }
            }
        }

If all parameters are valid, enter the main functionality. Otherwise, it will automatically print out the help text.

static void Main(string[] args)
{
	Parser.Default.ParseArguments<Options>(args)
		.WithParsed<Options>(o =>
		{      
			Console.WriteLine($"Searching for: {o.Search}");
			var startingLocation = Environment.CurrentDirectory;
			if (!string.IsNullOrEmpty(o.Location))
				startingLocation = o.Location;
			Console.WriteLine($"Starting Location: {startingLocation}");
			if (!string.IsNullOrEmpty(o.Extensions))
			{
				if (o.Extensions.StartsWith(".") && o.Extensions.Length > 1)
					Console.WriteLine($"File extension: {o.Extensions}");
				else
				{
					Console.WriteLine("Invalid extension parameter. Must be: .??? E.g: .cs");
					return;
				}
			}
			else
				Console.WriteLine("File extension: .*");

			DoSearch(o.Search, startingLocation, o.Extensions, o.MaxThreads);
		});
}

Run the producer and consumer threads:

static void DoSearch(string search, string location, string extension, int maxThreads)
{
	var queue = new ConcurrentQueue<string>();
	var result = new ConcurrentBag<string>();

	var producer = Task.Run(() =>
	{
		var loc = new DirectoryInfo(location);
		if (loc.Exists)
		{
			ProcessDirectory(loc.FullName, extension, queue);
		}
	});

	var consumer = Task.Run(() =>
	{
		var taskList = new List<Task>();

		while (true)
		{
			//wait until there are items in the queue 
			while (queue.Count() == 0 && producer.Status == TaskStatus.Running)
			{
				Thread.Sleep(1000);
			}

			//wait until the number of running threads is below maxThreads
			while (taskList.Count(x => x.Status == TaskStatus.Running) >= maxThreads)
			{
				Thread.Sleep(1000);
			}

			//launch worker thread
			taskList.Add(Task.Run(() =>
			{
				ProcessQueueu(queue, result, search);
			}));

			//no more item to process and producer is no longer running
			if (queue.Count() == 0 && producer.Status == TaskStatus.RanToCompletion)
				break;
		}
	});

	//wait until both producer and consumer are completed
	Task.WaitAll(producer, consumer);

	//Print out the result
	foreach (var res in result)
		Console.WriteLine(res);
}

Producer’s main function (EnumerateDirectory can be found HERE):

private static void ProcessDirectory(string loc, string extension, ConcurrentQueue<string> queue)
{   
	//Get all files under this location and its sub-directories
	var enumerator = new EnumerateDirectory.EnumerateDirectory(loc, true);
	var allPaths = enumerator.ToArray();
	enumerator.Dispose();

	var ctr = 0;
	foreach (string path in allPaths)
	{
		++ctr;
		if (ctr % 1000 == 0) Thread.Sleep(1000); //Take a rest every 1000 paths

		//process files, exclude directories
		if (File.Exists(path))
		{
			//find search candidate
			if (string.IsNullOrEmpty(extension) || path.ToLower().EndsWith(extension.ToLower().Trim()))
			{
				Console.WriteLine($"{ctr}/{allPaths.Count()} Enqueu {path}");
				queue.Enqueue(path);
			}
		}
	}

}

Consumer’s main function:

private static void ProcessQueueu(ConcurrentQueue<string> queue, ConcurrentBag<string> res, string search)
{
	//safely dequeue item
	if (queue.TryDequeue(out string str))
	{                  
		try
		{
			FileInfo f = new FileInfo(str);
			using var strm = f.OpenText();
			var reader = strm.ReadToEnd();
			int startPos = 0;
			int foundPos = reader.IndexOf(search);

			while (foundPos >= 0)
			{
				res.Add($"{f.FullName}:{foundPos}");
				startPos = foundPos + 1;
				foundPos = reader.IndexOf(search, startPos);
			}
		}
		catch (Exception ex)
		{
			Console.WriteLine($"processing {str} failed: {ex.GetBaseException().Message}.");
			return;
		}                 
	}
}

Example of running this program:

That’s all folks. I hope it helps, cheers!

GD Star Rating
loading...
Multi-threaded File Search using ConcurrentQueue, 3.0 out of 5 based on 1 rating

Possibly relevant:

About Hardono

Howdy! I'm Hardono. I am working as a Software Developer. I am working mostly in Windows, dealing with .NET, conversing in C#. But I know a bit of Linux, mainly because I need to keep this blog operational. I've been working in Logistics/Transport industry for more than 11 years.

No Comment

Add Your Comment