diff --git a/pkg/broker.go b/pkg/broker.go index 880b9f3..9a75b78 100644 --- a/pkg/broker.go +++ b/pkg/broker.go @@ -165,13 +165,7 @@ func (b *Broker) RegisterSubscriber(topic string, fn interface{}) error { return errors.New("handler func must have outcome argument") } - if len(b.subscriber.handlers) > 0 { - if b.subscriber.handlers[0].reqEl != reqType.Elem() { - return errors.New("first arguments for all handlers must have equal types") - } - } - - h := &handler{method: refFn, reqEl: reqType.Elem()} + h := &handler{method: refFn, reqEl: reqType.Elem(), topic: topic} b.subscriber.handlers = append(b.subscriber.handlers, h) b.subscriber.ext[key] = true diff --git a/pkg/broker_test.go b/pkg/broker_test.go index f7f1c4d..a6abdaf 100644 --- a/pkg/broker_test.go +++ b/pkg/broker_test.go @@ -200,13 +200,12 @@ func TestBroker_RegisterSubscriber_HandlerIncorrectFirstArgTypes(t *testing.T) { } err = b.RegisterSubscriber("test", fn2) - assert.Error(t, err) - assert.Equal(t, "first arguments for all handlers must have equal types", err.Error()) + assert.NoError(t, err) broker, ok := b.(*Broker) assert.True(t, ok) - assert.Len(t, broker.subscriber.handlers, 1) + assert.Len(t, broker.subscriber.handlers, 2) } func TestBroker_RegisterSubscriber_HandlerMoreOneHandlersCorrect(t *testing.T) { @@ -267,7 +266,6 @@ func TestBroker_InitSubscriber(t *testing.T) { sub := broker.initSubscriber(topic) assert.Equal(t, broker.rabbitMQ, sub.rabbit) - assert.Equal(t, topic, sub.topic) assert.Len(t, sub.handlers, 0) assert.Len(t, sub.ext, 0) diff --git a/pkg/mocks/BrokerInterface.go b/pkg/mocks/BrokerInterface.go index 2888a0e..fae47bc 100644 --- a/pkg/mocks/BrokerInterface.go +++ b/pkg/mocks/BrokerInterface.go @@ -2,12 +2,9 @@ package mocks -import ( - amqp "github.com/streadway/amqp" - mock "github.com/stretchr/testify/mock" - - proto "github.com/golang/protobuf/proto" -) +import amqp "github.com/streadway/amqp" +import mock "github.com/stretchr/testify/mock" +import proto "github.com/golang/protobuf/proto" // BrokerInterface is an autogenerated mock type for the BrokerInterface type type BrokerInterface struct { diff --git a/pkg/rabbitmq.go b/pkg/rabbitmq.go index c98a0d8..20810c9 100644 --- a/pkg/rabbitmq.go +++ b/pkg/rabbitmq.go @@ -18,6 +18,9 @@ const ( OptImmediate = "immediate" OptInternal = "internal" + HeaderXDeath = "x-death" + HeaderRoutingKeys = "routing-keys" + errorNicConnection = "connection not open" errorNilChannel = "channel not open" ) diff --git a/pkg/subscriber.go b/pkg/subscriber.go index cec6d08..92b2d51 100644 --- a/pkg/subscriber.go +++ b/pkg/subscriber.go @@ -3,6 +3,7 @@ package rabbitmq import ( "bytes" "errors" + "fmt" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/streadway/amqp" @@ -24,10 +25,10 @@ const ( type handler struct { method reflect.Value reqEl reflect.Type + topic string } type subscriber struct { - topic string handlers []*handler fn func(msg amqp.Delivery) @@ -47,7 +48,6 @@ type subscriber struct { func (b *Broker) initSubscriber(topic string) (subs *subscriber) { subs = &subscriber{ - topic: topic, rabbit: b.rabbitMQ, handlers: []*handler{}, ext: make(map[string]bool), @@ -87,15 +87,29 @@ func (s *subscriber) Subscribe() (err error) { return } + var handlerExists = false + for _, h := range s.handlers { + if msg.RoutingKey == h.topic { + handlerExists = true + } else if deathHeaders, ok := msg.Headers[HeaderXDeath]; ok && msg.RoutingKey == defaultQueueBindKey { + routingKey := s.getFirstRoutingKeyFromDeathLetter(deathHeaders) + + if routingKey == h.topic { + msg.RoutingKey = routingKey + handlerExists = true + } + } + + if !handlerExists { + continue + } + st := reflect.New(h.reqEl).Interface().(proto.Message) if msg.ContentType == protobufContentType { - err = proto.Unmarshal(msg.Body, st) - } else if msg.ContentType == jsonContentType { - err = jsonpb.Unmarshal(bytes.NewReader(msg.Body), st) } @@ -104,7 +118,7 @@ func (s *subscriber) Subscribe() (err error) { _ = msg.Nack(false, false) } log.Printf("[*] Cannot unmarshal message, message skipped. \n Error: %s \n Message: %s \n", err.Error(), string(msg.Body)) - continue + break } returnValues := h.method.Call([]reflect.Value{reflect.ValueOf(st), reflect.ValueOf(msg)}) @@ -126,6 +140,15 @@ func (s *subscriber) Subscribe() (err error) { _ = msg.Ack(false) } } + + break + } + + if !handlerExists { + log.Printf("[*] Unable to find handler for the routing key %s. Message skipped. \n Message: %s \n", msg.RoutingKey, string(msg.Body)) + if s.opts.ConsumeOpts.Opts[OptAutoAck] == false { + _ = msg.Reject(false) + } } } @@ -228,3 +251,22 @@ func (s *subscriber) consume() (dls <-chan amqp.Delivery, err error) { return } + +func (s *subscriber) getFirstRoutingKeyFromDeathLetter(deathHeaders interface{}) string { + if len(deathHeaders.([]interface{})) < 1 { + return "" + } + + deathHeader := deathHeaders.([]interface{})[0] + routingKeys, ok := deathHeader.(amqp.Table)[HeaderRoutingKeys] + + if !ok { + return "" + } + + if len(routingKeys.([]interface{})) < 1 { + return "" + } + + return fmt.Sprint(routingKeys.([]interface{})[0]) +} diff --git a/pkg/subscriber_test.go b/pkg/subscriber_test.go index fac6b8d..4943957 100644 --- a/pkg/subscriber_test.go +++ b/pkg/subscriber_test.go @@ -130,4 +130,4 @@ func TestSubscriber_Consume_Error(t *testing.T) { _, err = broker.subscriber.consume() assert.NotNil(t, err) -} \ No newline at end of file +}