using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
namespace HelloWorldApp
{
class TestImmutable
{
public static async Task StartPiplelineAsync()
{
var filenames = new BlockingCollection<string>();
var lines = new BlockingCollection<string>();
var words = new ConcurrentDictionary<string, int>();
var items = new BlockingCollection<Info>();
var coloredItems =new BlockingCollection<Info>();
Task t1 = PiplelineStages.ReadFilenamesAsync(@"../../..",filenames);
ColoredConsole.WriteLine("start stage 1");
Task t2 = PiplelineStages.LoadContentAsync(filenames,lines);
ColoredConsole.WriteLine("start stage 2");
Task t3 = PiplelineStages.ProcessContentAsync(lines,words);
await Task.WhenAll(t1,t2,t3);
ColoredConsole.WriteLine("stage 1,2,3 completed");
Task t4 = PiplelineStages.TransferContentAsync(words, items);
Task t5 = PiplelineStages.AddColorAsync(items,coloredItems);
Task t6 = PiplelineStages.ShowContentAsync(coloredItems);
}
}
class Info
{
public string Word { get; set; }
public int Count { get; set; }
public string Color { get; set; }
public override string ToString() => $"{}times:{Word}";
}
static class PiplelineStages
{
public static Task ReadFilenamesAsync(string path,BlockingCollection<string> output)
{
return Task.Factory.StartNew(()=>
{
foreach(string filename in Directory.EnumerateFiles(path,"*.cs",SearchOption.AllDirectories))
{
output.Add(filename);
ColoredConsole.WriteLine($"stage 1:added{filename}");
}
output.CompleteAdding();
},TaskCreationOptions.LongRunning);
}
public static async Task LoadContentAsync(BlockingCollection<string> input,BlockingCollection<string> output)
{
//GetConsumingEnumerable()是必要的,读取的同时也在填充,该方法获得阻塞集合的枚举器
foreach (var filename in input.GetConsumingEnumerable())
{
using(FileStream stream=File.OpenRead(filename))
{
var reader = new StreamReader(stream);
string line = null;
while((line=await reader.ReadLineAsync())!=null)
{
output.Add(line);
ColoredConsole.WriteLine($"stage 2:added{line}");
}
//必须使用此方法,否则读取器会等待更多的项被添加
output.CompleteAdding();
}
}
}
public static Task ProcessContentAsync(BlockingCollection<string> input,ConcurrentDictionary<string,int> output)
{
return Task.Factory.StartNew(()=> {
foreach(var line in input.GetConsumingEnumerable())
{
string[] words = line.Split(' ', ';', '\t','{','}','(',')',':',',','"');
foreach(var word in words.Where(w=>!string.IsNullOrEmpty(w)))
{
output.AddOrUpdate(key:word,addValue:1,updateValueFactory:(s,i)=>++i);
ColoredConsole.WriteLine($"stage 3:added{word}");
}
}
},TaskCreationOptions.LongRunning);
}
public static Task TransferContentAsync(ConcurrentDictionary<string, int> input,BlockingCollection<Info> output)
{
return Task.Factory.StartNew(()=>
{
foreach (var word in input.Keys)
{
int value;
if (input.TryGetValue(word,out value))
{
var info = new Info { Word=word,Count=value};
output.Add(info);
ColoredConsole.WriteLine($"stage 4:added{info}");
}
}
output.CompleteAdding();
},TaskCreationOptions.LongRunning
);
}
public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
{
return Task.Factory.StartNew(()=>
{
foreach (var item in input.GetConsumingEnumerable())
{
if(item.Count>40)
{
item.Color = "Red";
}
else if(item.Count>20)
{
item.Color = "Yellow";
}
else
{
item.Color = "Green";
}
output.Add(item);
ColoredConsole.WriteLine($"stage 5:added color {item.Color} to {item}");
}
output.CompleteAdding();
},TaskCreationOptions.LongRunning);
}
public static Task ShowContentAsync(BlockingCollection<Info> input)
{
return Task.Factory.StartNew(()=>
{
foreach(var item in input.GetConsumingEnumerable())
{
ColoredConsole.WriteLine($"stage 6:{item}",item.Color);
}
},TaskCreationOptions.LongRunning);
}
}
static class ColoredConsole
{
private static object syncOutput = new object();
public static void WriteLine(string message)
{
lock(syncOutput)
{
Console.WriteLine(message);
}
}
public static void WriteLine(string message,string color)
{
lock (syncOutput)
{
Console.ForegroundColor = (ConsoleColor)Enum.Parse(typeof(ConsoleColor),color);
Console.WriteLine(message);
Console.ResetColor();
}
}
}
}
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号