Threading

Some efforts are made to make the implementation thread safe. It is a dedicated effort for each backend. We provide some details here.

SQLite Thread Safe

SQLite connectors are not thread safe objects and can’t be shared among each other.

In order to make a Graph instance usable in a threaded manner we dynamically construct new connectors when threading is detected.

It is handled with the SQLite implementation of the MasterGraphs, which provides the helper connector to the Graph object. The SQLite variant of the MasterGraphs will try to spawn a new helper (hence a new connector) for each thread.

>>> import threading
>>> class Thread(threading.Thread):
...     def __init__(self, Graph, i):
...             self.graph = Graph
...             self.identity = i
...             self.exception_raised = 0
...             super().__init__()
...     def run(self):
...             try:
...                     self.res = list(self.graph[self.identity])
...                     # in Thread access to the db
...             except Exception as e:
...                     self.exception_raised += 1
...     def join(self):
...             self.graph.helper.close()
...             # close the thread connection on join

We can design a threaded access to the graph:

>>> G = nd.sqlite.Graph(db=":tempfile:")
>>> G.add_edges_from(enumerate(range(1, 100))) # Build some line graph
>>> threads = [Thread(G, i) for i in range(3)]
>>> for t in threads:
...             t.start()
>>> for t in threads:
...             t.join()
>>> sum(t.exception_raised for t in threads)
0

In the case of an in-memory SQLite Graph, it is impossible to be thread safe since it is not possible to copy the connector in any way. Hence, it will just raise a classical SQLite threading error. The same comment applies to SQLite Graphs that were not given the path of the SQLite file (e.g., when instantiating the Graph directly providing the connector rather than the SQLite file path).

>>> H = nd.sqlite.Graph()
>>> threads = [Thread(H, i) for i in range(3)]
>>> for t in threads:
...             t.start()
>>> for t in threads:
...     t.join()
>>> sum(t.exception_raised for t in threads)
3
>>> J = nd.sqlite.Graph()
>>> K = nd.sqlite.Graph(db=J.helper.db)
>>> threads = [Thread(K, i) for i in range(3)]
>>> for t in threads:
...             t.start()
>>> for t in threads:
...             t.join()
>>> sum(t.exception_raised for t in threads)
3

The responsibility for closing of the connection falls on the user.

TODO: Can we do better? ANSWER: I don’t think so.