新北市网站建设_网站建设公司_服务器部署_seo优化
2026/1/17 18:55:19 网站建设 项目流程

Bus is to be used to inform or broadcast the mutated state and command that need to be processed by multiple services

 

  

Scenario 1 ProductService received rest post message to persist a product and publish the product persistence message as event to rabbitmq

"rabbitMq": {"namespace": "Matt-Product","retries": 3,"retryInterval": 2,"username": "guest","password": "guest","virtualHost": "/","port": 5672,"hostnames": ["10.89.24.148"],"requestTimeout": "00:00:10","publishConfirmTimeout": "00:00:01","recoveryInterval": "00:00:10","persistentDeliveryMode": true,"autoCloseConnection": true,"automaticRecovery": true,"topologyRecovery": true,"exchange": {"durable": true,"autoDelete": false,"type": "Topic"},"queue": {"autoDelete": false,"durable": true,"exclusive": false}},
RabbitMq config section

 

using Common.Handlers;
using Common.RabbitMQ;
using Common.Repo;
using ProductService.Commands;
using ProductService.Events;
using ProductService.Models;
using System.Linq;
using System.Threading.Tasks;namespace ProductService.CommandHandlers
{public class NewProductCommandHandler : ICommandHandler<NewProductCommand>{IBusPublisher _busPublisher;public NewProductCommandHandler(IBusPublisher busPublisher){_busPublisher = busPublisher;} public async Task  HandleAsync(NewProductCommand command, ICorrelationContext context){var enumerator = DataStore<Product>.GetInstance().GetRecords(i=>i.Name == command.Name) ;if (enumerator.Count() == 0 ){DataStore<Product>.GetInstance().AddRecord(new Product(command.Id, command.Name, command.Category, command.Price));//Send product created event bus msgawait _busPublisher.PublishAsync<ProductCreated>( new ProductCreated { Id = command.Id, Name = command.Name }, context);}else{//Send rejected bus message
            }}}
}
NewProductCommandHandler

 

 

Scenario 2 OrderService and OperationService subscribe to the event and proceed with their own handling logic

 

using Common.Messages;
using Common.Handlers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RawRabbit;
using System;
using RawRabbit.Common;
using System.Threading.Tasks;
using System.Reflection;
using Polly;
using RawRabbit.Enrichers.MessageContext;
using Common.Exceptions;namespace Common.RabbitMQ
{public class BusSubscriber : IBusSubscriber{private readonly ILogger _logger;private readonly IBusClient _busClient;private readonly IServiceProvider _serviceProvider;private readonly string _defaultNamespace;private readonly int _retries;private readonly int _retryInterval;public BusSubscriber(IApplicationBuilder app){_logger = app.ApplicationServices.GetService<ILogger<BusSubscriber>>();_serviceProvider = app.ApplicationServices.GetService<IServiceProvider>();_busClient = _serviceProvider.GetService<IBusClient>();var options = _serviceProvider.GetService<RabbitMqOptions>();_defaultNamespace = options.Namespace;_retries = options.Retries >= 0 ? options.Retries : 3;_retryInterval = options.RetryInterval > 0 ? options.RetryInterval : 2;}public IBusSubscriber SubscribeCommand<TCommand>(string @namespace = null, string queueName = null,Func<TCommand, Exception, IRejectedEvent> onError = null)where TCommand : ICommand{_busClient.SubscribeAsync<TCommand, CorrelationContext>(async (command, correlationContext) =>{var commandHandler = _serviceProvider.GetService<ICommandHandler<TCommand>>();return await TryHandleAsync(command, correlationContext,() => commandHandler.HandleAsync(command, correlationContext), onError);},ctx => ctx.UseSubscribeConfiguration(cfg =>cfg.FromDeclaredQueue(q => q.WithName(GetQueueName<TCommand>(@namespace, queueName)))));return this;}public IBusSubscriber SubscribeEvent<TEvent>(string @namespace = null, string queueName = null,Func<TEvent, Exception, IRejectedEvent> onError = null)where TEvent : IEvent{_busClient.SubscribeAsync<TEvent, CorrelationContext>(async (@event, correlationContext) =>{var eventHandler = _serviceProvider.GetService<IEventHandler<TEvent>>();return await TryHandleAsync(@event, correlationContext,() => eventHandler.HandleAsync(@event, correlationContext), onError);},ctx => ctx.UseSubscribeConfiguration(cfg =>cfg.FromDeclaredQueue(q => q.WithName(GetQueueName<TEvent>(@namespace, queueName)))));return this;}// Internal retry for services that subscribe to the multiple events of the same types.// It does not interfere with the routing keys and wildcards (see TryHandleWithRequeuingAsync() below).private async Task<Acknowledgement> TryHandleAsync<TMessage>(TMessage message,CorrelationContext correlationContext,Func<Task> handle, Func<TMessage, Exception, IRejectedEvent> onError = null){var currentRetry = 0;var retryPolicy = Policy.Handle<Exception>().WaitAndRetryAsync(_retries, i => TimeSpan.FromSeconds(_retryInterval));var messageName = message.GetType().Name;return await retryPolicy.ExecuteAsync<Acknowledgement>(async ()=>{try{var retryMessage = currentRetry == 0? string.Empty: $"Retry: {currentRetry}'.";_logger.LogInformation($"Handling a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");await handle();_logger.LogInformation($"Handled a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");return new Ack();}//catch (CustomizedException<IEventHandler<IEvent>> exception)//{//    System.Diagnostics.Debug.WriteLine(exception.Message);//    return new Ack();//}catch (Exception exception){currentRetry++;_logger.LogError(exception, exception.Message);if (exception.GetType().FullName.Contains("CustomizedException") && onError != null){var rejectedEvent = onError(message, exception);await _busClient.PublishAsync(rejectedEvent, ctx => ctx.UseMessageContext(correlationContext));_logger.LogInformation($"Published a rejected event: '{rejectedEvent.GetType().Name}' " +$"for the message: '{messageName}' with correlation id: '{correlationContext.Id}'.");return new Ack();}throw new Exception($"Unable to handle a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}', " +$"retry {currentRetry - 1}/{_retries}...");}});}// RabbitMQ retry that will publish a message to the retry queue.// Keep in mind that it might get processed by the other services using the same routing key and wildcards.private async Task<Acknowledgement> TryHandleWithRequeuingAsync<TMessage>(TMessage message,CorrelationContext correlationContext,Func<Task> handle, Func<TMessage, Exception, IRejectedEvent> onError = null){var messageName = message.GetType().Name;var retryMessage = correlationContext.Retries == 0? string.Empty: $"Retry: {correlationContext.Retries}'.";_logger.LogInformation($"Handling a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");try{await handle();_logger.LogInformation($"Handled a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}'. {retryMessage}");return new Ack();}catch (Exception exception){_logger.LogError(exception, exception.Message);if (exception is Exception dShopException && onError != null){var rejectedEvent = onError(message, dShopException);await _busClient.PublishAsync(rejectedEvent, ctx => ctx.UseMessageContext(correlationContext));_logger.LogInformation($"Published a rejected event: '{rejectedEvent.GetType().Name}' " +$"for the message: '{messageName}' with correlation id: '{correlationContext.Id}'.");return new Ack();}if (correlationContext.Retries >= _retries){await _busClient.PublishAsync(RejectedEvent.For(messageName),ctx => ctx.UseMessageContext(correlationContext));throw new Exception($"Unable to handle a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}' " +$"after {correlationContext.Retries} retries.", exception);}_logger.LogInformation($"Unable to handle a message: '{messageName}' " +$"with correlation id: '{correlationContext.Id}', " +$"retry {correlationContext.Retries}/{_retries}...");return Retry.In(TimeSpan.FromSeconds(_retryInterval));}}private string GetQueueName<T>(string @namespace = null, string name = null){@namespace = string.IsNullOrWhiteSpace(@namespace)? (string.IsNullOrWhiteSpace(_defaultNamespace) ? string.Empty : _defaultNamespace): @namespace;var separatedNamespace = string.IsNullOrWhiteSpace(@namespace) ? string.Empty : $"{@namespace}.";return (string.IsNullOrWhiteSpace(name)? $"{Assembly.GetEntryAssembly().GetName().Name}/{separatedNamespace}{typeof(T).Name.Underscore()}": $"{name}/{separatedNamespace}{typeof(T).Name.Underscore()}").ToLowerInvariant();}}
}
BusSubscriber

 

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询