The motivation of this project is simple, I want to familiarize myself with System.Collections.Concurrent namespace. This project is a console application which will search files for a specified text. User can specify a number of parameters:
- text to be searched
- extension of the files to be searched (default is any extension. For simplicity, only accept one extension, e.g. .cs)
- starting location (default is current directory) inclusive of all sub-directories under it
- how many threads to use for searching (default is 8 threads)
The program will start two threads. A producer and a consumer. Producer will traverse the directory, pick the suitable files and put them into the queue. Consumer will spawn a number of worker threads. Each worker thread will then take one file from the queue, and searches its content. If matches, it will put the filename into a collection.
Since many threads will access the queue, we must use a thread-safe queue type. Hence, we use ConcurrentQueue.
To easily parse the command line parameters, CommandLineParse is my go to package.
Install-Package CommandLineParser -Version 2.8.0
We then specify how the command line parameters are structured:
public class Options { private int _maxThread = 8; [Option('e', "extension", Required = false, HelpText = "File extension, default: .*")] public string Extensions { get; set; } [Option('l', "location", Required = false, HelpText = "Starting location, default: current directory")] public string Location { get; set; } [Option('s', "search", Required = true, HelpText = "String to be searched")] public string Search { get; set; } [Option('m', "maxthread", Required = false, HelpText = "Max number of process thread, default: 8")] public int MaxThreads { get { return _maxThread; } set { _maxThread = value; } } }
If all parameters are valid, enter the main functionality. Otherwise, it will automatically print out the help text.
static void Main(string[] args) { Parser.Default.ParseArguments<Options>(args) .WithParsed<Options>(o => { Console.WriteLine($"Searching for: {o.Search}"); var startingLocation = Environment.CurrentDirectory; if (!string.IsNullOrEmpty(o.Location)) startingLocation = o.Location; Console.WriteLine($"Starting Location: {startingLocation}"); if (!string.IsNullOrEmpty(o.Extensions)) { if (o.Extensions.StartsWith(".") && o.Extensions.Length > 1) Console.WriteLine($"File extension: {o.Extensions}"); else { Console.WriteLine("Invalid extension parameter. Must be: .??? E.g: .cs"); return; } } else Console.WriteLine("File extension: .*"); DoSearch(o.Search, startingLocation, o.Extensions, o.MaxThreads); }); }
Run the producer and consumer threads:
static void DoSearch(string search, string location, string extension, int maxThreads) { var queue = new ConcurrentQueue<string>(); var result = new ConcurrentBag<string>(); var producer = Task.Run(() => { var loc = new DirectoryInfo(location); if (loc.Exists) { ProcessDirectory(loc.FullName, extension, queue); } }); var consumer = Task.Run(() => { var taskList = new List<Task>(); while (true) { //wait until there are items in the queue while (queue.Count() == 0 && producer.Status == TaskStatus.Running) { Thread.Sleep(1000); } //wait until the number of running threads is below maxThreads while (taskList.Count(x => x.Status == TaskStatus.Running) >= maxThreads) { Thread.Sleep(1000); } //launch worker thread taskList.Add(Task.Run(() => { ProcessQueueu(queue, result, search); })); //no more item to process and producer is no longer running if (queue.Count() == 0 && producer.Status == TaskStatus.RanToCompletion) break; } }); //wait until both producer and consumer are completed Task.WaitAll(producer, consumer); //Print out the result foreach (var res in result) Console.WriteLine(res); }
Producer’s main function (EnumerateDirectory can be found HERE):
private static void ProcessDirectory(string loc, string extension, ConcurrentQueue<string> queue) { //Get all files under this location and its sub-directories var enumerator = new EnumerateDirectory.EnumerateDirectory(loc, true); var allPaths = enumerator.ToArray(); enumerator.Dispose(); var ctr = 0; foreach (string path in allPaths) { ++ctr; if (ctr % 1000 == 0) Thread.Sleep(1000); //Take a rest every 1000 paths //process files, exclude directories if (File.Exists(path)) { //find search candidate if (string.IsNullOrEmpty(extension) || path.ToLower().EndsWith(extension.ToLower().Trim())) { Console.WriteLine($"{ctr}/{allPaths.Count()} Enqueu {path}"); queue.Enqueue(path); } } } }
Consumer’s main function:
private static void ProcessQueueu(ConcurrentQueue<string> queue, ConcurrentBag<string> res, string search) { //safely dequeue item if (queue.TryDequeue(out string str)) { try { FileInfo f = new FileInfo(str); using var strm = f.OpenText(); var reader = strm.ReadToEnd(); int startPos = 0; int foundPos = reader.IndexOf(search); while (foundPos >= 0) { res.Add($"{f.FullName}:{foundPos}"); startPos = foundPos + 1; foundPos = reader.IndexOf(search, startPos); } } catch (Exception ex) { Console.WriteLine($"processing {str} failed: {ex.GetBaseException().Message}."); return; } } }
Example of running this program:
That’s all folks. I hope it helps, cheers!
loading...
About Hardono
Incoming Search
.net, c#, concurrency