Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit f93aad4

Browse files
Protocol versioning (#297)
## What is the goal of this PR? We use a new protocol API to perform a "connection open". This API does server-side protocol version compatibility checks, and replaces our previous need to get all databases to check that the connection is available. This API is called transparently during the construction of both the Core and Cluster clients. ## What are the changes implemented in this PR? * Refactor Stubs to call `connectionOpen` in their constructors * Throw a useful message if the server returns an `unimplemented` error
1 parent 6155300 commit f93aad4

File tree

12 files changed

+64
-32
lines changed

12 files changed

+64
-32
lines changed

dependencies/vaticle/artifacts.bzl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def vaticle_typedb_artifacts():
2929
artifact_name = "typedb-server-{platform}-{version}.{ext}",
3030
tag_source = deployment["artifact.release"],
3131
commit_source = deployment["artifact.snapshot"],
32-
commit = "cb119c536c44275f58df7b54da033f8fe076cac5",
32+
commit = "ad86a23fb78dda097a8b61b9d01ef68751cb14c1",
3333
)
3434

3535
def vaticle_typedb_cluster_artifacts():
@@ -39,5 +39,5 @@ def vaticle_typedb_cluster_artifacts():
3939
artifact_name = "typedb-cluster-all-{platform}-{version}.{ext}",
4040
tag_source = deployment_private["artifact.release"],
4141
commit_source = deployment_private["artifact.snapshot"],
42-
commit = "8044753056dd20b8e6bec6f62036eb0003d4ba06",
42+
commit = "6e00e0bff33c8c9c90f6393620dd30a9526748c3",
4343
)

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
## Dependencies
3737

38-
typedb-protocol==2.18.0.dev2
38+
typedb-protocol==2.18.0.dev3
3939
grpcio>=1.43.0,<2
4040
protobuf>=3.15.6,<4
4141
parse==1.18.0

typedb/common/exception.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# specific language governing permissions and limitations
1919
# under the License.
2020
#
21-
from typing import Optional, Union, Any
21+
from typing import Union, Any
2222

2323
from grpc import RpcError, Call, StatusCode
2424

@@ -38,7 +38,9 @@ def __init__(self, msg: Union["ErrorMessage", str], cause: BaseException = None,
3838

3939
@staticmethod
4040
def of_rpc(rpc_error: Union[RpcError, Call]) -> "TypeDBClientException":
41-
if rpc_error.code() in [StatusCode.UNAVAILABLE, StatusCode.UNKNOWN] or "Received RST_STREAM" in str(rpc_error):
41+
if rpc_error.code() is StatusCode.UNIMPLEMENTED:
42+
return TypeDBClientException(msg=RPC_METHOD_UNAVAILABLE, cause=rpc_error.details())
43+
elif rpc_error.code() in [StatusCode.UNAVAILABLE, StatusCode.UNKNOWN] or "Received RST_STREAM" in str(rpc_error):
4244
return TypeDBClientException(msg=UNABLE_TO_CONNECT, cause=rpc_error)
4345
elif rpc_error.code() is StatusCode.INTERNAL and "[RPL01]" in str(rpc_error):
4446
return TypeDBClientException(msg=CLUSTER_REPLICA_NOT_PRIMARY, cause=None)
@@ -75,24 +77,25 @@ def __init__(self, code: int, message: str):
7577
super(ClientErrorMessage, self).__init__(code_prefix="CLI", code_number=code, message_prefix="Client Error", message_body=message)
7678

7779

78-
CLIENT_CLOSED = ClientErrorMessage(1, "The client has been closed and no further operation is allowed.")
79-
SESSION_CLOSED = ClientErrorMessage(2, "The session has been closed and no further operation is allowed.")
80-
TRANSACTION_CLOSED = ClientErrorMessage(3, "The transaction has been closed and no further operation is allowed.")
81-
TRANSACTION_CLOSED_WITH_ERRORS = ClientErrorMessage(4, "The transaction has been closed with error(s):\n%s.")
82-
UNABLE_TO_CONNECT = ClientErrorMessage(5, "Unable to connect to TypeDB server.")
83-
NEGATIVE_VALUE_NOT_ALLOWED = ClientErrorMessage(6, "Value cannot be less than 1, was: '%d'.")
84-
MISSING_DB_NAME = ClientErrorMessage(7, "Database name cannot be empty.")
85-
DB_DOES_NOT_EXIST = ClientErrorMessage(8, "The database '%s' does not exist.")
86-
MISSING_RESPONSE = ClientErrorMessage(9, "Unexpected empty response for request ID '%s'.")
87-
UNKNOWN_REQUEST_ID = ClientErrorMessage(10, "Received a response with unknown request id '%s':\n%s")
88-
CLUSTER_NO_PRIMARY_REPLICA_YET = ClientErrorMessage(11, "No replica has been marked as the primary replica for latest known term '%d'.")
89-
CLUSTER_UNABLE_TO_CONNECT = ClientErrorMessage(12, "Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '%s'.")
90-
CLUSTER_REPLICA_NOT_PRIMARY = ClientErrorMessage(13, "The replica is not the primary replica.")
91-
CLUSTER_ALL_NODES_FAILED = ClientErrorMessage(14, "Attempted connecting to all cluster members, but the following errors occurred: \n%s")
92-
CLUSTER_USER_DOES_NOT_EXIST = ClientErrorMessage(15, "The user '%s' does not exist.")
93-
CLUSTER_TOKEN_CREDENTIAL_INVALID = ClientErrorMessage(16, "Invalid token credential.")
94-
CLUSTER_INVALID_ROOT_CA_PATH = ClientErrorMessage(17, "The provided Root CA path '%s' does not exist.")
95-
CLUSTER_CLIENT_CALLED_WITH_STRING = ClientErrorMessage(18, "The first argument of TypeDBClient.cluster() must be a List of server addresses to connect to. It was called with a string, not a List, which is not allowed.")
80+
RPC_METHOD_UNAVAILABLE = ClientErrorMessage(1, "The server does not support this method, please check the client-server compatibility:\n'%s'.")
81+
CLIENT_NOT_OPEN = ClientErrorMessage(2, "The client is not open.")
82+
SESSION_CLOSED = ClientErrorMessage(3, "The session has been closed and no further operation is allowed.")
83+
TRANSACTION_CLOSED = ClientErrorMessage(4, "The transaction has been closed and no further operation is allowed.")
84+
TRANSACTION_CLOSED_WITH_ERRORS = ClientErrorMessage(5, "The transaction has been closed with error(s):\n%s.")
85+
UNABLE_TO_CONNECT = ClientErrorMessage(6, "Unable to connect to TypeDB server.")
86+
NEGATIVE_VALUE_NOT_ALLOWED = ClientErrorMessage(7, "Value cannot be less than 1, was: '%d'.")
87+
MISSING_DB_NAME = ClientErrorMessage(8, "Database name cannot be empty.")
88+
DB_DOES_NOT_EXIST = ClientErrorMessage(9, "The database '%s' does not exist.")
89+
MISSING_RESPONSE = ClientErrorMessage(10, "Unexpected empty response for request ID '%s'.")
90+
UNKNOWN_REQUEST_ID = ClientErrorMessage(11, "Received a response with unknown request id '%s':\n%s")
91+
CLUSTER_NO_PRIMARY_REPLICA_YET = ClientErrorMessage(12, "No replica has been marked as the primary replica for latest known term '%d'.")
92+
CLUSTER_UNABLE_TO_CONNECT = ClientErrorMessage(13, "Unable to connect to TypeDB Cluster. Attempted connecting to the cluster members, but none are available: '%s'.")
93+
CLUSTER_REPLICA_NOT_PRIMARY = ClientErrorMessage(14, "The replica is not the primary replica.")
94+
CLUSTER_ALL_NODES_FAILED = ClientErrorMessage(15, "Attempted connecting to all cluster members, but the following errors occurred: \n%s")
95+
CLUSTER_USER_DOES_NOT_EXIST = ClientErrorMessage(16, "The user '%s' does not exist.")
96+
CLUSTER_TOKEN_CREDENTIAL_INVALID = ClientErrorMessage(17, "Invalid token credential.")
97+
CLUSTER_INVALID_ROOT_CA_PATH = ClientErrorMessage(18, "The provided Root CA path '%s' does not exist.")
98+
CLUSTER_CLIENT_CALLED_WITH_STRING = ClientErrorMessage(19, "The first argument of TypeDBClient.cluster() must be a List of server addresses to connect to. It was called with a string, not a List, which is not allowed.")
9699

97100

98101
class ConceptErrorMessage(ErrorMessage):

typedb/common/rpc/request_builder.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import typedb_protocol.cluster.cluster_server_pb2 as cluster_server_proto
2626
import typedb_protocol.cluster.cluster_user_pb2 as cluster_user_proto
2727
import typedb_protocol.common.concept_pb2 as concept_proto
28+
import typedb_protocol.common.connection_pb2 as connection_proto
29+
import typedb_protocol.common.version_pb2 as version_proto
2830
import typedb_protocol.common.logic_pb2 as logic_proto
2931
import typedb_protocol.common.options_pb2 as options_proto
3032
import typedb_protocol.common.query_pb2 as query_proto
@@ -35,6 +37,13 @@
3537
from typedb.common.exception import TypeDBClientException, GET_HAS_WITH_MULTIPLE_FILTERS
3638
from typedb.common.label import Label
3739

40+
# Connection
41+
42+
def connection_open_req():
43+
req = connection_proto.Connection.Open.Req()
44+
req.version = version_proto.Version.VERSION
45+
return req
46+
3847

3948
# CoreDatabaseManager
4049

typedb/common/rpc/stub.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from typing import Iterator
2424
from typing import TypeVar, Callable
2525

26+
import typedb_protocol.common.connection_pb2 as connection_proto
2627
import typedb_protocol.common.session_pb2 as session_proto
2728
import typedb_protocol.common.transaction_pb2 as transaction_proto
2829
import typedb_protocol.core.core_database_pb2 as core_database_proto

typedb/connection/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from typedb.api.connection.options import TypeDBOptions
2828
from typedb.api.connection.session import SessionType
2929
from typedb.common.concurrent.scheduled_executor import ScheduledExecutor
30+
from typedb.common.exception import CLIENT_NOT_OPEN, TypeDBClientException
3031
from typedb.common.rpc.stub import TypeDBStub
3132
from typedb.connection.database_manager import _TypeDBDatabaseManagerImpl
3233
from typedb.connection.session import _TypeDBSessionImpl
@@ -42,11 +43,13 @@ def __init__(self, address: str, parallelisation: int = 2):
4243
self._transmitter = RequestTransmitter(parallelisation)
4344
self._sessions: Dict[bytes, _TypeDBSessionImpl] = {}
4445
self._sessions_lock = Lock()
45-
self._is_open = True
4646
self._pulse_executor = ScheduledExecutor()
4747
self._pulse_executor.schedule_at_fixed_rate(interval=self._PULSE_INTERVAL_SECONDS, action=self._transmit_pulses)
4848

4949
def session(self, database: str, session_type: SessionType, options=None) -> _TypeDBSessionImpl:
50+
if not self.is_open():
51+
raise TypeDBClientException.of(CLIENT_NOT_OPEN)
52+
5053
if not options:
5154
options = TypeDBOptions.core()
5255
session = _TypeDBSessionImpl(self, database, session_type, options)

typedb/connection/cluster/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from typedb.connection.cluster.stub import _ClusterServerStub
3333
from typedb.connection.cluster.user_manager import _ClusterUserManager
3434
from typedb.common.rpc.request_builder import cluster_server_manager_all_req
35-
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_UNABLE_TO_CONNECT
35+
from typedb.common.exception import TypeDBClientException, UNABLE_TO_CONNECT, CLUSTER_UNABLE_TO_CONNECT, CLIENT_NOT_OPEN
3636

3737

3838
class _ClusterClient(TypeDBClusterClient):
@@ -62,9 +62,12 @@ def _fetch_server_addresses(self, addresses: Iterable[str]) -> Set[str]:
6262
raise TypeDBClientException.of(CLUSTER_UNABLE_TO_CONNECT, ",".join(addresses))
6363

6464
def session(self, database: str, session_type: SessionType, options=None) -> _ClusterSession:
65+
if not self.is_open():
66+
raise TypeDBClientException.of(CLIENT_NOT_OPEN)
6567
if not options:
6668
options = TypeDBOptions.cluster()
67-
return self._session_any_replica(database, session_type, options) if getattr(options, "read_any_replica", False) else self._session_primary_replica(database, session_type, options)
69+
return self._session_any_replica(database, session_type, options) if getattr(options, "read_any_replica", False) \
70+
else self._session_primary_replica(database, session_type, options)
6871

6972
def _session_primary_replica(self, database: str, session_type: SessionType, options=None) -> _ClusterSession:
7073
return _OpenSessionFailsafeTask(database, session_type, options, self).run_primary_replica()

typedb/connection/cluster/server_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self, address: str, credential: TypeDBCredential, parallelisation:
4040
self._channel_credentials = grpc.ssl_channel_credentials()
4141
self._channel, self._stub = self.new_channel_and_stub()
4242
self._databases = _TypeDBDatabaseManagerImpl(self.stub())
43+
self._is_open = True
4344

4445
def databases(self) -> _TypeDBDatabaseManagerImpl:
4546
return self._databases

typedb/connection/cluster/stub.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
from typedb.api.connection.credential import TypeDBCredential
3535
from typedb.common.exception import CLUSTER_TOKEN_CREDENTIAL_INVALID, TypeDBClientException, UNABLE_TO_CONNECT
36-
from typedb.common.rpc.request_builder import cluster_user_token_req
36+
from typedb.common.rpc.request_builder import cluster_user_token_req, connection_open_req
3737
from typedb.common.rpc.stub import TypeDBStub
3838

3939
T = TypeVar('T')
@@ -47,8 +47,10 @@ def __init__(self, channel: Channel, credential: TypeDBCredential):
4747
self._channel = channel
4848
self._stub = core_service_proto.TypeDBStub(channel)
4949
self._cluster_stub = cluster_service_proto.TypeDBClusterStub(channel)
50+
5051
self._token = None
5152
try:
53+
self._stub.connection_open(connection_open_req())
5254
res = self._cluster_stub.user_token(cluster_user_token_req(self._credential.username()))
5355
self._token = res.token
5456
except RpcError as e:

typedb/connection/core/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from grpc import Channel, insecure_channel
2323

24+
from typedb.common.exception import TypeDBClientException, CLIENT_NOT_OPEN
2425
from typedb.common.rpc.stub import TypeDBStub
2526
from typedb.connection.client import _TypeDBClientImpl
2627
from typedb.connection.core.stub import _CoreStub
@@ -33,8 +34,11 @@ def __init__(self, address: str, parallelisation: int = 2):
3334
super(_CoreClient, self).__init__(address, parallelisation)
3435
self._channel, self._stub = self.new_channel_and_stub()
3536
self._databases = _TypeDBDatabaseManagerImpl(self.stub())
37+
self._is_open = True
3638

3739
def databases(self) -> _TypeDBDatabaseManagerImpl:
40+
if not self.is_open():
41+
raise TypeDBClientException.of(CLIENT_NOT_OPEN)
3842
return self._databases
3943

4044
def channel(self) -> Channel:

0 commit comments

Comments
 (0)