The motivation of this project is simple, I want to familiarize myself with System.Collections.Concurrent namespace. This project is a console application which will search files for a specified text. User can specify a number of parameters:
- text to be searched
- extension of the files to be searched (default is any extension. For simplicity, only accept one extension, e.g. .cs)
- starting location (default is current directory) inclusive of all sub-directories under it
- how many threads to use for searching (default is 8 threads)
The program will start two threads. A producer and a consumer. Producer will traverse the directory, pick the suitable files and put them into the queue. Consumer will spawn a number of worker threads. Each worker thread will then take one file from the queue, and searches its content. If matches, it will put the filename into a collection.
Since many threads will access the queue, we must use a thread-safe queue type. Hence, we use ConcurrentQueue.
To easily parse the command line parameters, CommandLineParse is my go to package.
Install-Package CommandLineParser -Version 2.8.0
We then specify how the command line parameters are structured:
public class Options
{
private int _maxThread = 8;
[Option('e', "extension", Required = false, HelpText = "File extension, default: .*")]
public string Extensions { get; set; }
[Option('l', "location", Required = false, HelpText = "Starting location, default: current directory")]
public string Location { get; set; }
[Option('s', "search", Required = true, HelpText = "String to be searched")]
public string Search { get; set; }
[Option('m', "maxthread", Required = false, HelpText = "Max number of process thread, default: 8")]
public int MaxThreads
{
get { return _maxThread; }
set { _maxThread = value; }
}
}
If all parameters are valid, enter the main functionality. Otherwise, it will automatically print out the help text.
static void Main(string[] args)
{
Parser.Default.ParseArguments<Options>(args)
.WithParsed<Options>(o =>
{
Console.WriteLine($"Searching for: {o.Search}");
var startingLocation = Environment.CurrentDirectory;
if (!string.IsNullOrEmpty(o.Location))
startingLocation = o.Location;
Console.WriteLine($"Starting Location: {startingLocation}");
if (!string.IsNullOrEmpty(o.Extensions))
{
if (o.Extensions.StartsWith(".") && o.Extensions.Length > 1)
Console.WriteLine($"File extension: {o.Extensions}");
else
{
Console.WriteLine("Invalid extension parameter. Must be: .??? E.g: .cs");
return;
}
}
else
Console.WriteLine("File extension: .*");
DoSearch(o.Search, startingLocation, o.Extensions, o.MaxThreads);
});
}
Run the producer and consumer threads:
static void DoSearch(string search, string location, string extension, int maxThreads)
{
var queue = new ConcurrentQueue<string>();
var result = new ConcurrentBag<string>();
var producer = Task.Run(() =>
{
var loc = new DirectoryInfo(location);
if (loc.Exists)
{
ProcessDirectory(loc.FullName, extension, queue);
}
});
var consumer = Task.Run(() =>
{
var taskList = new List<Task>();
while (true)
{
//wait until there are items in the queue
while (queue.Count() == 0 && producer.Status == TaskStatus.Running)
{
Thread.Sleep(1000);
}
//wait until the number of running threads is below maxThreads
while (taskList.Count(x => x.Status == TaskStatus.Running) >= maxThreads)
{
Thread.Sleep(1000);
}
//launch worker thread
taskList.Add(Task.Run(() =>
{
ProcessQueueu(queue, result, search);
}));
//no more item to process and producer is no longer running
if (queue.Count() == 0 && producer.Status == TaskStatus.RanToCompletion)
break;
}
});
//wait until both producer and consumer are completed
Task.WaitAll(producer, consumer);
//Print out the result
foreach (var res in result)
Console.WriteLine(res);
}
Producer’s main function (EnumerateDirectory can be found HERE):
private static void ProcessDirectory(string loc, string extension, ConcurrentQueue<string> queue)
{
//Get all files under this location and its sub-directories
var enumerator = new EnumerateDirectory.EnumerateDirectory(loc, true);
var allPaths = enumerator.ToArray();
enumerator.Dispose();
var ctr = 0;
foreach (string path in allPaths)
{
++ctr;
if (ctr % 1000 == 0) Thread.Sleep(1000); //Take a rest every 1000 paths
//process files, exclude directories
if (File.Exists(path))
{
//find search candidate
if (string.IsNullOrEmpty(extension) || path.ToLower().EndsWith(extension.ToLower().Trim()))
{
Console.WriteLine($"{ctr}/{allPaths.Count()} Enqueu {path}");
queue.Enqueue(path);
}
}
}
}
Consumer’s main function:
private static void ProcessQueueu(ConcurrentQueue<string> queue, ConcurrentBag<string> res, string search)
{
//safely dequeue item
if (queue.TryDequeue(out string str))
{
try
{
FileInfo f = new FileInfo(str);
using var strm = f.OpenText();
var reader = strm.ReadToEnd();
int startPos = 0;
int foundPos = reader.IndexOf(search);
while (foundPos >= 0)
{
res.Add($"{f.FullName}:{foundPos}");
startPos = foundPos + 1;
foundPos = reader.IndexOf(search, startPos);
}
}
catch (Exception ex)
{
Console.WriteLine($"processing {str} failed: {ex.GetBaseException().Message}.");
return;
}
}
}
Example of running this program:

That’s all folks. I hope it helps, cheers!
