Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Admin interface {

GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error)
FetchAllTopicList(ctx context.Context) (*TopicList, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
GetBrokerClusterInfo(ctx context.Context) (*BrokerClusterInfo, error)
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
FetchClusterList(topic string) ([]string, error)
Close() error
Expand Down Expand Up @@ -164,6 +164,26 @@ func (a *admin) FetchAllTopicList(ctx context.Context) (*TopicList, error) {
return &topicList, nil
}

func (a *admin) GetBrokerClusterInfo(ctx context.Context) (*BrokerClusterInfo, error) {
cmd := remote.NewRemotingCommand(internal.ReqGetBrokerClusterInfo, nil, nil)
response, err := a.cli.InvokeSync(ctx, a.cli.GetNameSrv().AddrList()[0], cmd, 3*time.Second)
if err != nil {
rlog.Error("Get broker cluster info error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
var clusterInfo BrokerClusterInfo
_, err = clusterInfo.Decode(response.Body, &clusterInfo)
if err != nil {
rlog.Error("Get broker cluster info decode error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
return &clusterInfo, nil
}

// CreateTopic create topic.
// TODO: another implementation like sarama, without brokerAddr as input
func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
Expand Down
62 changes: 61 additions & 1 deletion admin/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ limitations under the License.

package admin

import "encoding/json"
import (
"encoding/json"
"regexp"

"github.com/tidwall/gjson"
)

type RemotingSerializable struct {
}
Expand Down Expand Up @@ -86,3 +91,58 @@ type SubscriptionGroupConfig struct {
WhichBrokerWhenConsumeSlowly int
NotifyConsumerIdsChangedEnable bool
}

type BrokerClusterInfo struct {
BrokerAddrTable map[string]ClusterBrokerData `json:"brokerAddrTable"`
ClusterAddrTable map[string][]string `json:"clusterAddrTable"`
}

type ClusterBrokerData struct {
Cluster string `json:"cluster"`
BrokerName string `json:"brokerName"`
BrokerAddrs map[string]string `json:"brokerAddrs"`
}

// normalizeNumericObjectKeys {0:"ip"} -> {"0":"ip"}
func normalizeNumericObjectKeys(raw string) string {
re := regexp.MustCompile(`([\{,]\s*)(\d+)(\s*:)`)
return re.ReplaceAllString(raw, `$1"$2"$3`)
}

func (info *BrokerClusterInfo) Decode(data []byte, classOfT interface{}) (interface{}, error) {
res := gjson.ParseBytes(data)

info.BrokerAddrTable = make(map[string]ClusterBrokerData)
info.ClusterAddrTable = make(map[string][]string)

res.Get("brokerAddrTable").ForEach(func(k, v gjson.Result) bool {
brokerName := k.String()
raw := v.Get("brokerAddrs").Raw
if raw == "" {
raw = v.Get("brokerAddrs").String()
}
fixed := normalizeNumericObjectKeys(raw)
addrs := make(map[string]string)
_ = json.Unmarshal([]byte(fixed), &addrs)

info.BrokerAddrTable[brokerName] = ClusterBrokerData{
Cluster: v.Get("cluster").String(),
BrokerName: v.Get("brokerName").String(),
BrokerAddrs: addrs,
}
return true
})

res.Get("clusterAddrTable").ForEach(func(k, v gjson.Result) bool {
cluster := k.String()
list := make([]string, 0, len(v.Array()))
v.ForEach(func(_, item gjson.Result) bool {
list = append(list, item.String())
return true
})
info.ClusterAddrTable[cluster] = list
return true
})

return info, nil
}
33 changes: 33 additions & 0 deletions examples/admin/cluster/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2/admin"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {

nameSrvAddr := []string{"127.0.0.1:9876"}

testAdmin, err := admin.NewAdmin(
admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)),
admin.WithCredentials(primitive.Credentials{
AccessKey: "RocketMQ",
SecretKey: "12345678",
}),
)

// cluster info
result, err := testAdmin.GetBrokerClusterInfo(context.Background())
if err != nil {
fmt.Println("GetBrokerClusterInfo error:", err.Error())
}
fmt.Println(result)

err = testAdmin.Close()
if err != nil {
fmt.Printf("Shutdown admin error: %s", err.Error())
}
}