<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
</Project>
-----------------------
using QuotesServer.Models;
namespace QuotesServer.Helpers;
public static class QuoteGenerator{ private static readonly Random rnd = new();
// Все доступные тикеры
private static readonly string[] tickers =
{
// Tech Stocks
"GOOGL", "MSFT", "NVDA",
// Consumer Stocks
"AMZN", "AAPL",
// Finance Stocks
"JPM", "BAC", "GS",
// Energy Stocks
"XOM", "CVX"
};
public static StockQuote Generate()
{
var ticker = tickers[rnd.Next(tickers.Length)];
// Генерируем разные цены для разных типов акций
decimal basePrice;
if (ticker == "GOOGL" || ticker == "AMZN" || ticker == "NVDA")
{
// Дорогие технологические акции
basePrice = (decimal)(rnd.Next(1000, 3000) + rnd.NextDouble());
}
else if (ticker == "JPM" || ticker == "GS")
{
// Финансовые акции
basePrice = (decimal)(rnd.Next(200, 500) + rnd.NextDouble());
}
else
{
// Обычные акции
basePrice = (decimal)(rnd.Next(50, 500) + rnd.NextDouble());
}
var spread = (decimal)(rnd.NextDouble() * 2);
return new StockQuote(
Timestamp: DateTime.Now,
Ticker: ticker,
Bid: Math.Round(basePrice - spread, 2),
Ask: Math.Round(basePrice + spread, 2),
Last: Math.Round(basePrice, 2),
Volume: rnd.Next(1, 10000)); // Увеличил максимальный объем
}
}
using System.IO.Pipes;
namespace QuotesServer.Interfaces;
public interface IClientConnection : IDisposable{ string Channel { get; } NamedPipeServerStream Stream { get; } bool IsConnected { get; } Task WriteAsync(string data);}
namespace QuotesServer.Models;
public class ChannelConfig{ public string ChannelName { get; set; } = string.Empty; public string DisplayName { get; set; } = string.Empty; public List<string> Tickers { get; set; } = new(); public int MaxClients { get; set; } = 3; // Максимум параллельных клиентов на канал public ConsoleColor Color { get; set; } = ConsoleColor.White;}
namespace QuotesServer.Models;
public record StockQuote(DateTime Timestamp, string Ticker, decimal Bid, decimal Ask, decimal Last, int Volume);
using System.IO.Pipes;using QuotesServer.Interfaces;
namespace QuotesServer.Services;
public class ClientConnection : IClientConnection{ private readonly StreamWriter _writer; private readonly string _channel; private bool _disposed;
public string Channel => _channel;
public NamedPipeServerStream Stream { get; }
public bool IsConnected => Stream.IsConnected && !_disposed;
public ClientConnection(NamedPipeServerStream stream, string channel)
{
Stream = stream;
_channel = channel;
_writer = new StreamWriter(stream)
{
AutoFlush = true,
NewLine = "\n"
};
}
public async Task WriteAsync(string data)
{
if (_disposed || !Stream.IsConnected)
throw new IOException($"Client disconnected from {_channel}");
try
{
await _writer.WriteLineAsync(data);
}
catch
{
throw;
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
try { _writer?.Dispose(); } catch { }
try { Stream?.Dispose(); } catch { }
}
}
using System.Collections.Concurrent;using QuotesServer.Interfaces;
namespace QuotesServer.Services;
public class EventHub{ private readonly ConcurrentDictionary<string, List<Func<string, Task>>> _handlers = new(); private readonly ConcurrentDictionary<string, List<IClientConnection>> _clients = new(); private readonly ILogger<EventHub>? _logger;
public EventHub(ILogger<EventHub>? logger = null)
{
_logger = logger;
}
public void Subscribe(string ticker, Func<string, Task> handler)
{
_handlers.AddOrUpdate(ticker,
_ => new List<Func<string, Task>> { handler },
(_, list) => { list.Add(handler); return list; });
}
public void RegisterClient(string channel, IClientConnection client)
{
_clients.AddOrUpdate(channel,
_ => new List<IClientConnection> { client },
(_, list) => { list.Add(client); return list; });
var count = _clients.TryGetValue(channel, out var list) ? list.Count : 0;
_logger?.LogInformation($"EventHub: Client registered on channel '{channel}' (Total: {count})");
}
public void UnregisterClient(string channel, IClientConnection client)
{
if (_clients.TryGetValue(channel, out var list))
{
if (list.Remove(client))
{
var count = list.Count;
_logger?.LogInformation($"EventHub: Client unregistered from channel '{channel}' (Remaining: {count})");
}
}
}
public async Task PublishToChannelAsync(string channel, string message)
{
if (!_clients.TryGetValue(channel, out var clients) || clients.Count == 0)
{
_logger?.LogDebug($"EventHub: No clients on channel '{channel}', message dropped");
return;
}
var disconnected = new List<IClientConnection>();
foreach (var client in clients)
{
try
{
if (!client.IsConnected)
{
_logger?.LogDebug($"EventHub: Client on '{channel}' is not connected");
disconnected.Add(client);
continue;
}
await client.WriteAsync(message);
}
catch
{
disconnected.Add(client);
}
}
foreach (var client in disconnected)
{
if (clients.Remove(client))
{
try { client.Dispose(); } catch { }
}
}
}
public void Publish(string ticker, string message)
{
if (_handlers.TryGetValue(ticker, out var handlers))
{
foreach (var handler in handlers)
{
_ = handler.Invoke(message);
}
}
}
public int GetClientCount(string channel)
{
return _clients.TryGetValue(channel, out var list) ? list.Count : 0;
}
}
using System.Collections.Concurrent;using QuotesServer.Helpers;using QuotesServer.Models;using System.Text.Json;using QuotesServer.Services;using System.Diagnostics;
namespace QuotesServer.BackgroundServices;
public sealed class ParallelQuoteServerService : BackgroundService{ private readonly ILogger<ParallelQuoteServerService> _logger; private readonly EventHub _eventHub; private readonly ConcurrentDictionary<string, PooledPipeServer> _pipeServers = new(); private readonly List<ChannelConfig> _channels; private readonly Dictionary<string, int> _publishCount = new(); private readonly string[] _allTickers; // Добавляем поле для всех тикеров
public ParallelQuoteServerService(
ILogger<ParallelQuoteServerService> logger,
EventHub eventHub)
{
_logger = logger;
_eventHub = eventHub;
_channels = new List<ChannelConfig>
{
new()
{
ChannelName = @"\\.\pipe\tech-stocks",
DisplayName = "Tech Stocks",
Tickers = new List<string> { "GOOGL", "MSFT", "NVDA" },
MaxClients = 3,
Color = ConsoleColor.Green
},
new()
{
ChannelName = @"\\.\pipe\consumer-stocks",
DisplayName = "Consumer Stocks",
Tickers = new List<string> { "AMZN", "AAPL" },
MaxClients = 3,
Color = ConsoleColor.Yellow
},
new()
{
ChannelName = @"\\.\pipe\finance-stocks",
DisplayName = "Finance Stocks",
Tickers = new List<string> { "JPM", "BAC", "GS" },
MaxClients = 2,
Color = ConsoleColor.Cyan
},
new()
{
ChannelName = @"\\.\pipe\energy-stocks",
DisplayName = "Energy Stocks",
Tickers = new List<string> { "XOM", "CVX" },
MaxClients = 2,
Color = ConsoleColor.Magenta
}
};
// Собираем все тикеры из всех каналов
_allTickers = _channels.SelectMany(c => c.Tickers).Distinct().ToArray();
// Инициализируем счетчики публикаций для всех тикеров
foreach (var ticker in _allTickers)
{
_publishCount[ticker] = 0;
}
_logger.LogInformation($"Initialized with {_allTickers.Length} tickers: {string.Join(", ", _allTickers)}");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("ParallelQuoteServerService: Starting with parallel channels...");
_logger.LogInformation($"Total tickers configured: {_allTickers.Length}");
// Запускаем все каналы параллельно
var serverTasks = new List<Task>();
foreach (var channel in _channels)
{
var server = new PooledPipeServer(
channel.ChannelName,
channel.DisplayName,
channel.MaxClients,
_eventHub,
_logger);
_pipeServers[channel.DisplayName] = server;
// Запускаем каждый канал в отдельной задаче
serverTasks.Add(Task.Run(async () =>
{
try
{
await server.StartAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error in channel {channel.DisplayName}");
}
}, stoppingToken));
_logger.LogInformation($"Channel '{channel.DisplayName}' started with {channel.MaxClients} parallel slots");
}
// Настраиваем маршрутизацию
SetupRouting();
// Запускаем генератор котировок
await RunQuoteGeneratorAsync(stoppingToken);
}
private async Task RunQuoteGeneratorAsync(CancellationToken stoppingToken)
{
var rnd = new Random();
var statsTimer = new Stopwatch();
statsTimer.Start();
while (!stoppingToken.IsCancellationRequested)
{
var quote = QuoteGenerator.Generate();
var serializedData = JsonSerializer.Serialize(quote);
// Увеличиваем счетчик
if (_publishCount.ContainsKey(quote.Ticker))
{
_publishCount[quote.Ticker]++;
}
else
{
_logger.LogWarning($"⚠️ Unknown ticker generated: {quote.Ticker}");
_publishCount[quote.Ticker] = 1;
}
_eventHub.Publish(quote.Ticker, serializedData);
// Логируем с цветом канала
var channel = _channels.FirstOrDefault(c => c.Tickers.Contains(quote.Ticker));
if (channel != null)
{
_logger.LogInformation($"📊 [{channel.DisplayName}] Published {quote.Ticker} @ {quote.Last:C} (Total: {_publishCount[quote.Ticker]})");
}
else
{
_logger.LogWarning($"⚠️ Ticker {quote.Ticker} not assigned to any channel!");
}
// Каждые 30 секунд показываем статистику
if (statsTimer.Elapsed.TotalSeconds >= 30)
{
LogStatistics();
statsTimer.Restart();
}
// Случайная задержка
await Task.Delay(rnd.Next(500, 1500), stoppingToken);
}
}
private void LogStatistics()
{
_logger.LogInformation("=== Publication Statistics ===");
foreach (var channel in _channels)
{
var channelStats = channel.Tickers.Select(t => $"{t}:{_publishCount.GetValueOrDefault(t, 0)}");
_logger.LogInformation($"{channel.DisplayName}: {string.Join(", ", channelStats)}");
// Проверяем, есть ли клиенты на канале
var clientCount = _eventHub.GetClientCount(channel.DisplayName);
_logger.LogInformation($" Clients connected: {clientCount}");
}
_logger.LogInformation("==============================");
}
private void SetupRouting()
{
foreach (var channel in _channels)
{
foreach (var ticker in channel.Tickers)
{
_eventHub.Subscribe(ticker, async data =>
{
var clientCount = _eventHub.GetClientCount(channel.DisplayName);
_logger.LogDebug($"Routing {ticker} to {channel.DisplayName} (clients: {clientCount})");
await _eventHub.PublishToChannelAsync(channel.DisplayName, data);
});
}
}
_logger.LogInformation("ParallelQuoteServerService: Routing configured for all channels");
foreach (var channel in _channels)
{
_logger.LogInformation($"{channel.DisplayName} → {string.Join(", ", channel.Tickers)} (Max {channel.MaxClients} parallel)");
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
LogStatistics();
_logger.LogInformation("ParallelQuoteServerService: Stopping...");
foreach (var server in _pipeServers.Values)
{
server.Dispose();
}
await base.StopAsync(cancellationToken);
}
}
using System.Collections.Concurrent;using System.IO.Pipes;using System.Text.Json;using QuotesServer.Interfaces;using QuotesServer.Models;
namespace QuotesServer.Services;
public class PooledPipeServer : IDisposable{ private readonly ILogger _logger; private readonly string _channelName; private readonly string _pipeName; private readonly int _maxClients; private readonly EventHub _eventHub; private readonly ConcurrentBag<Task> _activeConnections = new(); private readonly SemaphoreSlim _connectionSemaphore; private CancellationTokenSource _cts = new(); private bool _isDisposed;
public string ChannelName => _channelName;
public int ActiveConnections => _activeConnections.Count(c => !c.IsCompleted);
public int MaxClients => _maxClients;
public PooledPipeServer(
string fullPipeName,
string channelName,
int maxClients,
EventHub eventHub,
ILogger logger)
{
_channelName = channelName;
_pipeName = fullPipeName.Replace(@"\\.\pipe\", "");
_maxClients = maxClients;
_eventHub = eventHub;
_logger = logger;
_connectionSemaphore = new SemaphoreSlim(maxClients, maxClients);
}
public async Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation($"PooledPipeServer [{_channelName}]: Starting with max {_maxClients} parallel clients");
try
{
// Запускаем несколько серверов параллельно
var serverTasks = new List<Task>();
for (int i = 0; i < _maxClients; i++)
{
serverTasks.Add(RunSinglePipeServerAsync(i, stoppingToken));
}
await Task.WhenAll(serverTasks);
}
catch (OperationCanceledException)
{
_logger.LogInformation($"PooledPipeServer [{_channelName}]: Shutting down...");
}
}
private async Task RunSinglePipeServerAsync(int serverId, CancellationToken stoppingToken)
{
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, _cts.Token);
while (!linkedCts.Token.IsCancellationRequested)
{
NamedPipeServerStream? pipeServer = null;
ClientConnection? client = null;
try
{
// Ожидаем доступный слот в семафоре
await _connectionSemaphore.WaitAsync(linkedCts.Token);
pipeServer = new NamedPipeServerStream(
$"{_pipeName}_{serverId}", // Уникальное имя для каждого экземпляра
PipeDirection.Out,
_maxClients,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
_logger.LogDebug($"PooledPipeServer [{_channelName}]: Server {serverId} waiting for connection...");
await pipeServer.WaitForConnectionAsync(linkedCts.Token);
client = new ClientConnection(pipeServer, $"{_channelName}_{serverId}");
_eventHub.RegisterClient(_channelName, client);
_logger.LogInformation($"PooledPipeServer [{_channelName}]: Client connected to server {serverId} (Active: {ActiveConnections}/{_maxClients})");
// Мониторим соединение в отдельной задаче
var connectionTask = MonitorClientAsync(client, serverId, linkedCts.Token);
_activeConnections.Add(connectionTask);
// Не ждем завершения, чтобы принимать новые подключения
_ = connectionTask.ContinueWith(t =>
{
_connectionSemaphore.Release();
_eventHub.UnregisterClient(_channelName, client);
client.Dispose();
_logger.LogInformation($"PooledPipeServer [{_channelName}]: Client disconnected from server {serverId} (Active: {ActiveConnections}/{_maxClients})");
}, TaskScheduler.Default);
}
catch (OperationCanceledException)
{
_connectionSemaphore.Release();
break;
}
catch (Exception ex)
{
_logger.LogError(ex, $"PooledPipeServer [{_channelName}]: Error in server {serverId}");
_connectionSemaphore.Release();
await Task.Delay(1000, linkedCts.Token);
}
}
}
private async Task MonitorClientAsync(ClientConnection client, int serverId, CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
if (!client.IsConnected)
{
break;
}
await Task.Delay(500, stoppingToken);
}
}
catch (OperationCanceledException)
{
// Нормальное завершение
}
}
public void Dispose()
{
if (_isDisposed) return;
_isDisposed = true;
_cts.Cancel();
_cts.Dispose();
_connectionSemaphore.Dispose();
}
}
using QuotesServer.BackgroundServices;using QuotesServer.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<EventHub>();builder.Services.AddHostedService<ParallelQuoteServerService>();
var app = builder.Build();
app.MapGet("/", () => @"
<h1>📊 Parallel Quote Server is Running</h1>
<p><strong>Architecture:</strong> Named Pipes IPC Server with Parallel Channels</p>
<p><strong>Active Channels:</strong></p>
<ul>
<li>Tech Stocks (GOOGL, MSFT, NVDA) - Max 3 parallel clients</li>
<li>Consumer Stocks (AMZN, AAPL) - Max 3 parallel clients</li>
<li>Finance Stocks (JPM, BAC, GS) - Max 2 parallel clients</li>
<li>Energy Stocks (XOM, CVX) - Max 2 parallel clients</li>
</ul>
<p>Check logs for real-time parallel quote distribution.</p>
");
app.Run();
Это серверПроанализируй код, сделай код-ревью