Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 9842b9b

Browse files
author
Alex Walker
authored
When closing a client, also close all its sessions (#191)
## What is the goal of this PR? The `close` method of `GraknClient` now automatically closes all `Session` objects that were opened from that client. ## What are the changes implemented in this PR? The `close` method of `GraknClient` now automatically closes all `Session` objects that were opened from that client.
1 parent 8f54a7f commit 9842b9b

File tree

5 files changed

+24
-8
lines changed

5 files changed

+24
-8
lines changed

dependencies/graknlabs/artifacts.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ def graknlabs_grakn_cluster_artifacts():
3737
artifact_name = "grakn-cluster-all-{platform}-{version}.{ext}",
3838
tag_source = deployment_private["artifact.release"],
3939
commit_source = deployment_private["artifact.snapshot"],
40-
commit = "cc3382fe6be239973f1e7d8437a71683e3dff809",
40+
commit = "a5cf161f63b33010009c63e2b955d2fa81e4e736",
4141
)

grakn/client.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,15 @@ def __init__(self, address: str):
9191
self._address = address
9292
self._channel = grpc.insecure_channel(self._address)
9393
self._databases = _DatabaseManagerRPC(self._channel)
94+
self._sessions: Dict[bytes, _SessionRPC] = {}
9495
self._is_open = True
9596

9697
def session(self, database: str, session_type: SessionType, options=None) -> Session:
9798
if not options:
9899
options = GraknOptions.core()
99-
return _SessionRPC(self, database, session_type, options)
100+
session = _SessionRPC(self, database, session_type, options)
101+
self._sessions[session.session_id()] = session
102+
return session
100103

101104
def databases(self):
102105
return self._databases
@@ -105,6 +108,8 @@ def is_open(self):
105108
return self._is_open
106109

107110
def close(self):
111+
for session_id in self._sessions:
112+
self._sessions[session_id].close()
108113
self._channel.close()
109114
self._is_open = False
110115

@@ -121,6 +126,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
121126
else:
122127
return False
123128

129+
def remove_session(self, session: _SessionRPC):
130+
del self._sessions[session.session_id()]
131+
124132
def channel(self):
125133
return self._channel
126134

grakn/rpc/session.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@
2020
import sched
2121
import time
2222
from abc import ABC, abstractmethod
23-
from threading import Thread
23+
from threading import Thread, Lock
2424

2525
import grakn_protocol.protobuf.session_pb2 as session_proto
2626
import grpc
2727
from grakn_protocol.protobuf.grakn_pb2_grpc import GraknStub
28-
from grpc import RpcError
2928

3029
from grakn import grakn_proto_builder
31-
from grakn.common.exception import GraknClientException
3230
from grakn.options import GraknOptions
3331
from grakn.rpc.database import Database, _DatabaseRPC
3432
from grakn.rpc.transaction import Transaction, TransactionType
@@ -83,19 +81,21 @@ class _SessionRPC(Session):
8381
def __init__(self, client, database: str, session_type: SessionType, options: GraknOptions = None):
8482
if not options:
8583
options = GraknOptions.core()
84+
self._client = client
8685
self._address = client._address
8786
self._channel = grpc.insecure_channel(client._address)
8887
self._scheduler = sched.scheduler(time.time, time.sleep)
8988
self._database = _DatabaseRPC(database_manager=client.databases(), name=database)
9089
self._session_type = session_type
9190
self._grpc_stub = GraknStub(self._channel)
91+
self._lock = Lock()
9292

9393
open_req = session_proto.Session.Open.Req()
9494
open_req.database = database
9595
open_req.type = _session_type_proto(session_type)
9696
open_req.options.CopyFrom(grakn_proto_builder.options(options))
9797

98-
self._session_id = self._grpc_stub.session_open(open_req).session_id
98+
self._session_id: bytes = self._grpc_stub.session_open(open_req).session_id
9999
self._is_open = True
100100
self._pulse = self._scheduler.enter(delay=self._PULSE_FREQUENCY_SECONDS, priority=1, action=self._transmit_pulse, argument=())
101101
Thread(target=self._scheduler.run, name="session_pulse_{}".format(self._session_id.hex()), daemon=True).start()
@@ -112,8 +112,11 @@ def is_open(self) -> bool:
112112
return self._is_open
113113

114114
def close(self) -> None:
115+
self._lock.acquire(blocking=True)
115116
if self._is_open:
116117
self._is_open = False
118+
self._client.remove_session(self)
119+
self._lock.release()
117120
self._scheduler.cancel(self._pulse)
118121
self._scheduler.empty()
119122
req = session_proto.Session.Close.Req()
@@ -122,10 +125,15 @@ def close(self) -> None:
122125
self._grpc_stub.session_close(req)
123126
finally:
124127
self._channel.close()
128+
else:
129+
self._lock.release()
125130

126131
def database(self) -> Database:
127132
return self._database
128133

134+
def session_id(self) -> bytes:
135+
return self._session_id
136+
129137
def _transmit_pulse(self) -> None:
130138
if not self._is_open:
131139
return

grakn/rpc/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class TransactionType(enum.Enum):
4545

4646
class Transaction:
4747

48-
def __init__(self, address: str, session_id: str, transaction_type: TransactionType, options: GraknOptions = None):
48+
def __init__(self, address: str, session_id: bytes, transaction_type: TransactionType, options: GraknOptions = None):
4949
if not options:
5050
options = GraknOptions.core()
5151
self._transaction_type = transaction_type

tools/behave_rule.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def _rule_implementation(ctx):
110110
cmd += " && rm -rf " + steps_out_dir
111111
cmd += " && mkdir " + steps_out_dir + " && "
112112
cmd += " && ".join(["cp %s %s" % (step_file.path, steps_out_dir) for step_file in ctx.files.steps])
113-
cmd += " && behave %s -D port=$PORT && export RESULT=0 || export RESULT=1" % feats_dir
113+
cmd += " && behave %s --no-capture -D port=$PORT && export RESULT=0 || export RESULT=1" % feats_dir
114114
cmd += """
115115
echo Tests concluded with exit value $RESULT
116116
echo Stopping server.

0 commit comments

Comments
 (0)