Skip to content

Commit

Permalink
rabbitmq reverts
Browse files Browse the repository at this point in the history
  • Loading branch information
its-a-feature committed Feb 10, 2025
1 parent 1f27126 commit a694d3d
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 133 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.3.1-rc42] - 2025-02-10

### Changed

- Just reverting the rabbitmq tweaks since the queues aren't getting cleaned up properly

## [3.3.1-rc41] - 2025-02-10

### Changed
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.3.1-rc41
3.3.1-rc42
2 changes: 1 addition & 1 deletion mythic-docker/src/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.3.1-rc41
3.3.1-rc42
247 changes: 116 additions & 131 deletions mythic-docker/src/rabbitmq/utils_rabbitmq_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,30 +152,30 @@ func (r *rabbitMQConnection) GetConnection() (*amqp.Connection, error) {
defer r.mutex.Unlock()
if r.conn != nil && !r.conn.IsClosed() {
return r.conn, nil
}
for {
logging.LogInfo("Attempting to connect to rabbitmq")
conn, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
utils.MythicConfig.RabbitmqUser,
utils.MythicConfig.RabbitmqPassword,
utils.MythicConfig.RabbitmqHost,
utils.MythicConfig.RabbitmqPort,
utils.MythicConfig.RabbitmqVHost),
amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 10*time.Second)
} else {
for {
logging.LogInfo("Attempting to connect to rabbitmq")
conn, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
utils.MythicConfig.RabbitmqUser,
utils.MythicConfig.RabbitmqPassword,
utils.MythicConfig.RabbitmqHost,
utils.MythicConfig.RabbitmqPort,
utils.MythicConfig.RabbitmqVHost),
amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 10*time.Second)
},
},
},
)
if err != nil {
logging.LogError(err, "Failed to connect to rabbitmq")
time.Sleep(RETRY_CONNECT_DELAY)
continue
)
if err != nil {
logging.LogError(err, "Failed to connect to rabbitmq")
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
r.conn = conn
return conn, nil
}
r.conn = conn
return conn, nil
}

}
func (r *rabbitMQConnection) SendMessageMutexChannel(queue string, exchange string, correlationId string, body []byte, ignoreErrormessage bool) error {
// lock overall to make sure we don't double-create the entry
Expand Down Expand Up @@ -260,19 +260,19 @@ func (r *rabbitMQConnection) SendMessageMutexChannel(queue string, exchange stri
return nil
}
func (r *rabbitMQConnection) SendStructMessage(exchange string, queue string, correlationId string, body interface{}, ignoreErrorMessage bool) error {
jsonBody, err := json.Marshal(body)
if err != nil {
if jsonBody, err := json.Marshal(body); err != nil {
return err
} else {
return r.SendMessage(exchange, queue, correlationId, jsonBody, ignoreErrorMessage)
}
return r.SendMessage(exchange, queue, correlationId, jsonBody, ignoreErrorMessage)
}
func (r *rabbitMQConnection) SendRPCStructMessage(exchange string, queue string, body interface{}) ([]byte, error) {
inputBytes, err := json.Marshal(body)
if err != nil {
if inputBytes, err := json.Marshal(body); err != nil {
logging.LogError(err, "Failed to convert input to JSON", "input", body)
return nil, err
} else {
return r.SendRPCMessage(exchange, queue, inputBytes, true)
}
return r.SendRPCMessage(exchange, queue, inputBytes, true)
}
func (r *rabbitMQConnection) SendMessage(exchange string, queue string, correlationId string, body []byte, ignoreErrormessage bool) error {
// to send a normal message out to a direct queue set:
Expand All @@ -288,94 +288,95 @@ func (r *rabbitMQConnection) SendMessage(exchange string, queue string, correlat

}
func (r *rabbitMQConnection) SendRPCMessage(exchange string, queue string, body []byte, exclusiveQueue bool) ([]byte, error) {
conn, err := r.GetConnection()
if err != nil {
if conn, err := r.GetConnection(); err != nil {
return nil, err
}
ch, err := conn.Channel()
if err != nil {
logging.LogError(err, "Failed to open rabbitmq channel", "queue", queue)
} else if ch, err := conn.Channel(); err != nil {
logging.LogError(err, "Failed to open rabbitmq channel")
return nil, err
}
err = ch.Confirm(false)
if err != nil {
logging.LogError(err, "Channel could not be put into confirm mode", "queue", queue)
} else if err := ch.Confirm(false); err != nil {
logging.LogError(err, "Channel could not be put into confirm mode")
ch.Close()
return nil, err
}
err = ch.ExchangeDeclare(
} else if err = ch.ExchangeDeclare(
exchange, // exchange name
"direct", // type of exchange, ex: topic, fanout, direct, etc
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
logging.LogError(err, "Failed to declare exchange", "exchange", exchange, "exchange_type", "direct", "retry_wait_time", RETRY_CONNECT_DELAY, "queue", queue)
); err != nil {
logging.LogError(err, "Failed to declare exchange", "exchange", exchange, "exchange_type", "direct", "retry_wait_time", RETRY_CONNECT_DELAY)
return nil, err
}
msgs, err := ch.Consume(
} else if msgs, err := ch.Consume(
"amq.rabbitmq.reply-to", // queue name
"", // consumer
true, // auto-ack
exclusiveQueue, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
logging.LogError(err, "Failed to start consuming for RPC replies", "queue", queue)
); err != nil {
logging.LogError(err, "Failed to start consuming for RPC replies")
ch.Close()
return nil, err
}
defer ch.Close()
msg := amqp.Publishing{
ContentType: "application/json",
CorrelationId: uuid.NewString(),
Body: body,
ReplyTo: "amq.rabbitmq.reply-to",
}
for attempt := 0; attempt < 3; attempt++ {
} else {
defer ch.Close()
msg := amqp.Publishing{
ContentType: "application/json",
CorrelationId: uuid.NewString(),
Body: body,
ReplyTo: "amq.rabbitmq.reply-to",
}
if err = ch.Publish(
exchange, // exchange
queue, // routing key
true, // mandatory
false, // immediate
msg, // publishing
); err != nil {
logging.LogError(err, "there was an error publishing a message, trying again", "queue", queue)
continue
logging.LogError(err, "there was an error publishing a message", "queue", queue)
return nil, err
}
select {
case ntf := <-ch.NotifyPublish(make(chan amqp.Confirmation, 1)):
if !ntf.Ack {
err = errors.New("Failed to deliver message, not ACK-ed by receiver")
logging.LogError(err, "failed to deliver message to exchange/queue, notifyPublish, trying again", "queue", queue)
continue
err := errors.New("Failed to deliver message, not ACK-ed by receiver")
logging.LogError(err, "failed to deliver message to exchange/queue, notifyPublish")
return nil, err
}
case ret := <-ch.NotifyReturn(make(chan amqp.Return, 1)):
err = errors.New(getMeaningfulRabbitmqError(ret))
continue
case ret := <-ch.NotifyReturn(make(chan amqp.Return)):
err := errors.New(getMeaningfulRabbitmqError(ret))
return nil, err
case <-time.After(RPC_TIMEOUT):
err = errors.New("message delivery confirmation timed out in SendRPCMessage")
logging.LogError(err, "message delivery confirmation to exchange/queue timed out, trying again", "queue", queue)
continue
err := errors.New("message delivery confirmation timed out in SendRPCMessage")
logging.LogError(err, "message delivery confirmation to exchange/queue timed out", "queue", queue)
return nil, err
}
//logging.LogDebug("Sent RPC message", "queue", queue)
select {
case m := <-msgs:
//logging.LogDebug("Got RPC Reply", "queue", queue)
return m.Body, nil
case <-time.After(RPC_TIMEOUT):
logging.LogError(nil, "Timeout reached waiting for RPC reply, trying again", "queue", queue)
err = errors.New("timeout reached waiting for RPC reply")
continue
logging.LogError(nil, "Timeout reached waiting for RPC reply")
return nil, errors.New("Timeout reached waiting for RPC reply")
}
}
logging.LogError(err, "failed 3 times")
return nil, err

/*
_, err = ch.QueueDeclarePassive(
queue, // name, queue
false, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logging.LogError(err, "Failed to declare queue, RPC endpoint doesn't exist", "retry_wait_time", RETRY_CONNECT_DELAY)
return nil, err
}*/
}
func (r *rabbitMQConnection) ReceiveFromMythicDirectExchange(exchange string, queue string, routingKey string, handler QueueHandler, exclusiveQueue bool) {
// exchange is a direct exchange
Expand Down Expand Up @@ -459,108 +460,92 @@ func (r *rabbitMQConnection) ReceiveFromMythicDirectExchange(exchange string, qu
}
func (r *rabbitMQConnection) ReceiveFromRPCQueue(exchange string, queue string, routingKey string, handler RPCQueueHandler, exclusiveQueue bool) {
for {
conn, err := r.GetConnection()
if err != nil {
if conn, err := r.GetConnection(); err != nil {
logging.LogError(err, "Failed to connect to rabbitmq", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
ch, err := conn.Channel()
if err != nil {
} else if ch, err := conn.Channel(); err != nil {
logging.LogError(err, "Failed to open rabbitmq channel", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
err = ch.ExchangeDeclare(
} else if err = ch.ExchangeDeclare(
exchange, // exchange name
"direct", // type of exchange, ex: topic, fanout, direct, etc
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
); err != nil {
logging.LogError(err, "Failed to declare exchange", "exchange", exchange, "exchange_type", "direct", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
q, err := ch.QueueDeclare(
} else if q, err := ch.QueueDeclare(
queue, // name, queue
false, // durable
true, // delete when unused
exclusiveQueue, // exclusive
false, // no-wait
nil,
)
if err != nil {
logging.LogError(err, "Failed to declare queue", "retry_wait_time", RETRY_CONNECT_DELAY, "queue", queue)
nil, // arguments
); err != nil {
logging.LogError(err, "Failed to declare queue", "retry_wait_time", RETRY_CONNECT_DELAY)
ch.Close()
time.Sleep(RETRY_CONNECT_DELAY)
continue
}
err = ch.QueueBind(
} else if err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchange, // exchange name
false, // nowait
nil, // arguments
)
if err != nil {
logging.LogError(err, "Failed to bind to queue to receive messages", "retry_wait_time", RETRY_CONNECT_DELAY, "queue", queue)
); err != nil {
logging.LogError(err, "Failed to bind to queue to receive messages", "retry_wait_time", RETRY_CONNECT_DELAY)
time.Sleep(RETRY_CONNECT_DELAY)
ch.Close()
continue
}
msgs, err := ch.Consume(
} else if msgs, err := ch.Consume(
q.Name, // queue name
"", // consumer
false, // auto-ack
exclusiveQueue, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
); err != nil {
logging.LogError(err, "Failed to start consuming messages on queue", "queue", q.Name)
ch.Close()
continue
}
forever := make(chan bool)
go func() {
for d := range msgs {
responseMsg := handler(d)
responseMsgJson, err := json.Marshal(responseMsg)
if err != nil {
logging.LogError(err, "Failed to generate JSON for response response", "queue", queue)
continue
}
err = ch.Publish(
"", // exchange
d.ReplyTo, //routing key
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: responseMsgJson,
CorrelationId: d.CorrelationId,
})
if err != nil {
logging.LogError(err, "Failed to send message", "queue", queue)
continue
}
err = ch.Ack(d.DeliveryTag, false)
if err != nil {
logging.LogError(err, "Failed to Ack message", "queue", queue)
continue
} else {
forever := make(chan bool)
go func() {
for d := range msgs {
responseMsg := handler(d)
if responseMsgJson, err := json.Marshal(responseMsg); err != nil {
logging.LogError(err, "Failed to generate JSON for getFile response")
continue
} else if err = ch.Publish(
"", // exchange
d.ReplyTo, //routing key
true, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: responseMsgJson,
CorrelationId: d.CorrelationId,
}); err != nil {
logging.LogError(err, "Failed to send message")
} else if err = ch.Ack(d.DeliveryTag, false); err != nil {
logging.LogError(err, "Failed to Ack message")
}
}
}
forever <- true
}()
logging.LogInfo("Started listening for rpc messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
<-forever
ch.Close()
logging.LogError(nil, "Stopped listening for messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
forever <- true
}()
logging.LogInfo("Started listening for rpc messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
<-forever
ch.Close()
logging.LogError(nil, "Stopped listening for messages", "exchange", exchange, "queue", queue, "routingKey", routingKey)
}

}
}
func (r *rabbitMQConnection) CheckConsumerExists(exchange string, queue string, exclusiveQueue bool) (bool, error) {
Expand Down

0 comments on commit a694d3d

Please sign in to comment.