Hey @mrjn I’ve got a version implemented to wrap my head around how the whole query is executed. I wanted to recurse at the ProcessGraph level but had trouble accumulating results since it’s launched in a new goroutine from processRequest and also calls itself in a new goroutine for each of the children. So to just get something working I recursed at the processRequest level. Here’s the code delta in main.go:
var recurseOut = &query.SubGraph{
DestUIDs: &task.List{Uids: []uint64{}},
}
var recurseN uint64
var recurseRadius int
var cyclesRan = 0
func processRequest(ctx context.Context, gq *gql.GraphQuery,
l *query.Latency) (*query.SubGraph, wrappedErr) {
if gq == nil || (len(gq.UID) == 0 && gq.Func == nil) {
return &query.SubGraph{}, wrappedErr{nil, x.ErrorOk}
}
sg, err := query.ToSubGraph(ctx, gq)
if err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while conversion to internal format"))
return &query.SubGraph{}, wrappedErr{err, x.ErrorInvalidRequest}
}
l.Parsing = time.Since(l.Start)
x.Trace(ctx, "Query parsed")
rch := make(chan error)
// transform nearest query into near query
if sg.SrcFunc[0] == "nearest" {
if cyclesRan == 0 {
// we want N results - grab it just once
recurseN, _ = strconv.ParseUint(sg.SrcFunc[2], 10, 64)
}
recurseRadius += 200 // interval is 200
sg.SrcFunc[0] = "near"
sg.SrcFunc[2] = strconv.Itoa(recurseRadius)
cyclesRan++
}
go query.ProcessGraph(ctx, sg, nil, rch)
err = <-rch
if err != nil {
x.TraceError(ctx, x.Wrapf(err, "Error while executing query"))
return &query.SubGraph{}, wrappedErr{err, x.Error}
}
// build the results
recurseOut.DestUIDs.Uids = append(recurseOut.DestUIDs.Uids, sg.DestUIDs.Uids...)
// recurse if termination condition not met
if cyclesRan > 0 && uint64(len(recurseOut.DestUIDs.Uids)) < recurseN {
return processRequest(ctx, gq, l)
}
sg.DestUIDs.Uids = recurseOut.DestUIDs.Uids
l.Processing = time.Since(l.Start) - l.Parsing
x.Trace(ctx, "Graph processed")
if len(*dumpSubgraph) > 0 {
x.Checkf(os.MkdirAll(*dumpSubgraph, 0700), *dumpSubgraph)
s := time.Now().Format("20060102.150405.000000.gob")
filename := path.Join(*dumpSubgraph, s)
f, err := os.Create(filename)
x.Checkf(err, filename)
enc := gob.NewEncoder(f)
x.Check(enc.Encode(sg))
x.Checkf(f.Close(), filename)
}
return sg, wrappedErr{nil, ""}
}
You can try it with the query in the original post and it does return >10 results. It almost works (performance and style aside), but the act of directly accumulating the DestUIDs and then sticking them into the final result comes back to bite us in the butt in preTraverse. I added the following check to prevent the index out of range error but it filters out some legitimate answers, returning fewer than the query actually found:
func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
invalidUids := make(map[uint64]bool)
// We go through all predicate children of the subgraph.
for _, pc := range sg.Children {
idx := algo.IndexOf(pc.SrcUIDs, uid)
if idx < 0 {
continue
}
// There are many more UIDs that are filtered out here. The indices don't match up.
if len(pc.uidMatrix) <= idx {
continue
}
...
}
...
}
One solution is to make the uidMatrix and values fields of the SubGraph struct exported. Then I could accumulate them directly the way I do with DestUIDs. My intuition is that there’s a reason those were unexported and for keeping them so.
Another solution would be to recurse at the ProcessGraph level. I think this is much preferable, and ideally we could go even further down the chain and recurse at the processTask level, but either of those is made complex by the act of launching these functions in their own goroutines. I think I’d need to add or modify some fields of the SubGraph struct in order to accumulate data across recursive calls in separate goroutines.
This is where I’m at now. What are your thoughts?