Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ systest/bulk_live/live/**/*.rdf
systest/bulk_live/live/**/*.txt
x/log_test/*.enc
*.buf

##
.claude/
CLAUDE.md
2 changes: 1 addition & 1 deletion .trunk/trunk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ plugins:
# Many linters and tools depend on runtimes - configure them here. (https://docs.trunk.io/runtimes)
runtimes:
enabled:
- go@1.24.3
- go@1.25.6
- node@22.16.0
- python@3.10.8

Expand Down
4 changes: 4 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ they form a Raft group and provide synchronous replication.

flag.Bool("mcp", false, "run MCP server along with alpha.")

flag.String("label", "",
"Label for this alpha. Labeled alphas only serve predicates with matching @label directive.")

// By default Go GRPC traces all requests.
grpc.EnableTracing = false

Expand Down Expand Up @@ -750,6 +753,7 @@ func run() {
AclPublicKey: keys.AclPublicKey,
Audit: opts.Audit != nil,
Badger: bopts,
Label: Alpha.Conf.GetString("label"),
}
x.WorkerConfig.Parse(Alpha.Conf)

Expand Down
7 changes: 7 additions & 0 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ func (n *node) handleTablet(tablet *pb.Tablet) error {
if tablet.Force {
originalGroup := state.Groups[prev.GroupId]
delete(originalGroup.Tablets, tablet.Predicate)
} else if tablet.Label != "" && prev.Label != tablet.Label {
// Allow re-routing when labels differ. This happens when a schema with @label
// is applied after the predicate was created without a label.
glog.Infof("Tablet for attr: [%s] re-routing from group %d to %d due to label change (%q -> %q)",
tablet.Predicate, prev.GroupId, tablet.GroupId, prev.Label, tablet.Label)
originalGroup := state.Groups[prev.GroupId]
delete(originalGroup.Tablets, tablet.Predicate)
} else if prev.GroupId != tablet.GroupId {
glog.Infof(
"Tablet for attr: [%s], gid: [%d] already served by group: [%d]\n",
Expand Down
14 changes: 12 additions & 2 deletions dgraph/cmd/zero/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ func (s *Server) movePredicate(predicate string, srcGroup, dstGroup uint32) erro
if tab == nil {
return errors.Errorf("Tablet to be moved: [%v] is not being served", predicate)
}
dstGroupLabel := s.getGroupLabel(dstGroup)
if dstGroupLabel != tab.Label {
// Don't allow a predicate to be moved to a group that doesn't share it's label.
// (label will be empty string on either if unassigned)
return errors.Errorf("Unable to move predicate [%v] with label '%s' to group with label '%s'",
predicate, tab.Label, dstGroupLabel)
}
msg := fmt.Sprintf("Going to move predicate: [%v], size: [ondisk: %v, uncompressed: %v]"+
" from group %d to %d\n", predicate, humanize.IBytes(uint64(tab.OnDiskBytes)),
humanize.IBytes(uint64(tab.UncompressedBytes)), srcGroup, dstGroup)
Expand Down Expand Up @@ -266,11 +273,14 @@ func (s *Server) chooseTablet() (predicate string, srcGroup uint32, dstGroup uin
size := int64(0)
group := s.state.Groups[srcGroup]
for _, tab := range group.Tablets {
// Reserved predicates should always be in group 1 so do not re-balance them.
if x.IsReservedPredicate(tab.Predicate) {
// Reserved predicates should always be in group 1 so do not re-balance them.
continue
}
if tab.Label != "" {
// labeled predicates are pinned and should not be re-balanced either
continue
}

// Finds a tablet as big a possible such that on moving it dstGroup's size is
// less than or equal to srcGroup.
if tab.OnDiskBytes <= sizeDiff/2 && tab.OnDiskBytes > size {
Expand Down
138 changes: 125 additions & 13 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,42 @@ func (s *Server) Inform(ctx context.Context, req *pb.TabletRequest) (*pb.TabletR
// Set the tablet to be served by this server's group.
var proposal pb.ZeroProposal
proposal.Tablets = make([]*pb.Tablet, 0)

for _, t := range unknownTablets {
if x.IsReservedPredicate(t.Predicate) {
// Force all the reserved predicates to be allocated to group 1.
// This is to make it easier to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
t.GroupId = 1
glog.Infof("Zero.Inform: routing tablet %s (label=%q, groupId=%d)", t.Predicate, t.Label, t.GroupId)
// Use closure to ensure lock is always released via defer, even on error paths.
// This pattern prevents lock leaks if new error conditions are added later.
if err := func() error {
s.RLock()
defer s.RUnlock()
switch {
case x.IsReservedPredicate(t.Predicate):
// Force all the reserved predicates to be allocated to group 1.
// This is to make it easier to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
t.GroupId = 1
case t.Label != "":
// Labeled predicate: route to matching labeled group
gid, err := s.labelGroup(t.Label)
if err != nil {
return err
}
glog.Infof("Zero.Inform: labeled predicate %s (label=%q) routed to group %d", t.Predicate, t.Label, gid)
t.GroupId = gid
case s.isLabeledGroup(t.GroupId):
// make sure unlabeled predicates don't go an labeled group
gid, err := s.firstUnlabeledGroup()
if err != nil {
return err
}
t.GroupId = gid
}
return nil
}(); err != nil {
return nil, err
}
proposal.Tablets = append(proposal.Tablets, t)
}
Expand Down Expand Up @@ -681,11 +708,22 @@ func (s *Server) ShouldServe(
// Check who is serving this tablet.
tab := s.ServingTablet(tablet.Predicate)
span.SetAttributes(attribute.String("tablet_predicate", tablet.Predicate))
span.SetAttributes(attribute.String("tablet_label", tablet.Label))
if tab != nil && !tablet.Force {
// Someone is serving this tablet. Could be the caller as well.
// The caller should compare the returned group against the group it holds to check who's
// serving.
return tab, nil
// If the existing tablet has a different label than requested, we need to re-route.
// This can happen when a schema is applied with @label after the predicate was
// created without a label (e.g., during DropAll).
if tablet.Label != "" && tab.Label != tablet.Label {
glog.Infof("ShouldServe: tablet %s has label %q but request has label %q, re-routing",
tablet.Predicate, tab.Label, tablet.Label)
// Fall through to re-assign the tablet with the new label
// The handleTablet function will allow this because labels differ
} else {
// Someone is serving this tablet. Could be the caller as well.
// The caller should compare the returned group against the group it holds to check who's
// serving.
return tab, nil
}
}

// Read-only requests should return an empty tablet instead of asking zero
Expand All @@ -697,15 +735,36 @@ func (s *Server) ShouldServe(
// Set the tablet to be served by this server's group.
var proposal pb.ZeroProposal

if x.IsReservedPredicate(tablet.Predicate) {
// Acquire read lock for label-related lookups
s.RLock()
switch {
case x.IsReservedPredicate(tablet.Predicate):
// Force all the reserved predicates to be allocated to group 1.
// This is to make it easier to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
tablet.GroupId = 1
case tablet.Label != "":
// Labeled predicate: route to matching labeled group
gid, err := s.labelGroup(tablet.Label)
if err != nil {
s.RUnlock()
return nil, err
}
glog.Infof("ShouldServe: labeled predicate %s (label=%q) routed to group %d", tablet.Predicate, tablet.Label, gid)
tablet.GroupId = gid
case s.isLabeledGroup(tablet.GroupId):
// Make sure unlabeled predicates don't go to a labeled group
gid, err := s.firstUnlabeledGroup()
if err != nil {
s.RUnlock()
return nil, err
}
tablet.GroupId = gid
}
s.RUnlock()
proposal.Tablet = tablet
if err := s.Node.proposeAndWait(ctx, &proposal); err != nil && err != errTabletAlreadyServed {
span.AddEvent(fmt.Sprintf("Error proposing tablet: %+v. Error: %v", &proposal, err))
Expand Down Expand Up @@ -862,3 +921,56 @@ func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState
}
return ms, nil
}

// groupLabel returns the label for a group (from first labeled member found)
// Caller must hold the read lock.
func (s *Server) groupLabel(gid uint32) string {
s.AssertRLock()
group := s.state.Groups[gid]
if group == nil {
return ""
}
for _, member := range group.Members {
if member.Label != "" {
return member.Label
}
}
return ""
}

// getGroupLabel is like groupLabel but handles its own locking.
// Use this when calling from code that doesn't already hold the lock.
func (s *Server) getGroupLabel(gid uint32) string {
s.RLock()
defer s.RUnlock()
return s.groupLabel(gid)
}

// labelGroup the group ID that has the given label, or 0 if none
func (s *Server) labelGroup(label string) (uint32, error) {
s.AssertRLock()
for gid, group := range s.state.Groups {
for _, member := range group.Members {
if member.Label == label {
return gid, nil
}
}
}
return 0, errors.Errorf("No alpha group with label '%s' found", label)
}

// isLabeledGroup returns true if any member in the group has a label
func (s *Server) isLabeledGroup(gid uint32) bool {
s.AssertRLock()
return s.groupLabel(gid) != ""
}

func (s *Server) firstUnlabeledGroup() (uint32, error) {
s.AssertRLock()
for gid := range s.state.Groups {
if !s.isLabeledGroup(gid) {
return gid, nil
}
}
return 0, errors.Errorf("No unlabeled alpha groups exist.")
}
8 changes: 7 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
syntax = "proto3";

package pb;

import "github.com/dgraph-io/dgo/v250/protos/api.proto";
import "github.com/dgraph-io/badger/v4/pb/pb.proto";
import "google/protobuf/descriptor.proto";
Expand Down Expand Up @@ -125,6 +124,7 @@ message Member {

bool clusterInfoOnly = 13;
bool forceGroupId = 14;
string label = 15; // Label for sharding affinity
}

message Group {
Expand Down Expand Up @@ -200,6 +200,7 @@ message Tablet {
bool readOnly = 9; // If true, do not ask zero to serve any tablets.
uint64 moveTs = 10;
int64 uncompressed_bytes = 11; // Estimated uncompressed size of tablet in bytes
string label = 12; // Label from predicate schema (for routing)
}

message DirectedEdge {
Expand Down Expand Up @@ -485,6 +486,7 @@ message SchemaUpdate {
bool lang = 9;
bool unique = 14;


// Fields required for type system.
bool non_nullable = 10;
bool non_nullable_list = 11;
Expand All @@ -495,6 +497,10 @@ message SchemaUpdate {

bool no_conflict = 13;

string label = 16; // Label from @label directive (for sharding affinity)



// Deleted field:
reserved 7;
reserved "explicit";
Expand Down
Loading
Loading