I’m pleased to see that bulk loading can read URIs and map to UIDs directly, while saving an xid edge…
I’d like to keep my reference data in that format, but then I can’t easily live load some incremental data through the mutations API. What I need to do is crawl through my file, query dgraph to find the xid → uid maps that exist, assign new blanks to those that don’t, and do all of that externally to the database. That’s a lot slower than anything… Wouldn’t there be a way I could live load some RDF that have some <whatever:node_id> as subject, and have the program do the mapping, and assignation of new UIDs?
import json
import requests
# Configuration
DGRAPH_ENDPOINT = "http://localhost:8080"
def get_existing_xid_mappings():
query = """
{
q(func: has(xid)) {
uid
xid
}
}
"""
headers = {'Content-Type': 'application/json'}
response = requests.post(f"{DGRAPH_ENDPOINT}/query", headers=headers, data=json.dumps({"query": query}))
response.raise_for_status()
result = response.json()
mappings = {node['xid']: node['uid'] for node in result['q']}
return mappings
def create_new_uids(xids, existing_mappings):
new_mappings = {}
for xid in xids:
if xid not in existing_mappings:
# Here you would typically use Dgraph's mutation to create a new node with the xid
mutation = {
"set": [
{
"uid": "_:new",
"xid": xid
}
]
}
headers = {'Content-Type': 'application/rdf+json', 'X-Dgraph-Mutation': 'true'}
response = requests.post(f"{DGRAPH_ENDPOINT}/mutate", headers=headers, data=json.dumps(mutation))
response.raise_for_status()
new_uid = response.json()['uids']['new']
new_mappings[xid] = new_uid
return new_mappings
def handle_rdf_data(rdf_data):
# Parse RDF to extract XIDs (this is a simplification; actual RDF parsing would be more complex)
xids = set()
for line in rdf_data.splitlines():
if line.strip().startswith('<'):
xid = line.split('>')[0][1:] # Very basic, might not work for all RDF formats
xids.add(xid)
existing_mappings = get_existing_xid_mappings()
new_mappings = create_new_uids(xids, existing_mappings)
# Now update your RDF with the correct UIDs
updated_rdf = []
for line in rdf_data.splitlines():
if line.strip().startswith('<'):
xid = line.split('>')[0][1:]
uid = new_mappings.get(xid, existing_mappings.get(xid))
if uid:
updated_line = line.replace(f"<{xid}>", f"<{uid}>")
updated_rdf.append(updated_line)
else:
updated_rdf.append(line) # Keep the line if we couldn't map the XID
else:
updated_rdf.append(line)
return '\n'.join(updated_rdf)
def main():
# Sample RDF data for demonstration
sample_rdf = """
<node1> <name> "Node One" .
<node2> <name> "Node Two" .
"""
updated_rdf = handle_rdf_data(sample_rdf)
print(updated_rdf)
# Mutation with updated RDF
headers = {'Content-Type': 'application/rdf', 'X-Dgraph-Mutation': 'true'}
response = requests.post(f"{DGRAPH_ENDPOINT}/mutate", headers=headers, data=updated_rdf)
response.raise_for_status()
print("Mutation completed:", response.text)
if __name__ == "__main__":
main()
dgraph live will check the predicate and do only upserts. For newly created subjects, it will also save the xid predicate with the value of the blank node. like _:whatever#node_id.
Note that we are storing the _: part of the blank node which is a bit confusing.
There is also an option to speed up the process by maintaining an xidmap.
Dgraph live can store the xidmap (in a folder in compressed format) and use it during the next loading.
I think we can combine both feature ( I have not tested it): use xidmap to replace blank nodes with known uids and use xid predicate to upsert in case data has been injected with a different tool.
Note that in the other tool (or API) has to mutate the xid prediate correctly.
FWIW, we can also use xidmap and store the “UID → blank-node-identifier” mapping using the --xidmap feature in the Live Loader. For subsequent incremental loads, just specify the path to the folder used to the store the xidmap, created from the initial or first load.
More importantly, there’s no need to create an XID predicate in your schema for this feature to work, as the term xidmap in the context of the Live Loader, is only used to describe the UID-> Blank_Node mapping, rather than pure UID->XID predicate mapping and could be easily misunderstood.