Skip to content

Commit 59d090a

Browse files
authored
tmp: disable propagation on stop (#43)
1 parent 2497cf2 commit 59d090a

File tree

2 files changed

+28
-11
lines changed

2 files changed

+28
-11
lines changed

jupyter_nbmodel_client/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
#
33
# BSD 3-Clause License
44

5-
VERSION = "0.13.4"
5+
VERSION = "0.13.5"

jupyter_nbmodel_client/client.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class NbModelClient(NotebookModel):
135135
user_agent: str = f"Datalayer-NbModelClient/{VERSION}"
136136
"""User agent used to identify the nbmodel client type in the awareness state."""
137137

138+
138139
def __init__(
139140
self,
140141
websocket_url: str,
@@ -156,6 +157,7 @@ def __init__(
156157
self.__run: asyncio.Task | None = None
157158
self.__is_running = False
158159

160+
159161
@property
160162
def path(self) -> str:
161163
"""Document path relative to the server root path."""
@@ -178,6 +180,7 @@ def username(self) -> str:
178180
"""Client owner username."""
179181
return self._username
180182

183+
181184
def __del__(self) -> None:
182185
if self.__run is not None:
183186
self.__run.cancel() # Theoritically, this should be awaited
@@ -189,6 +192,7 @@ async def __aenter__(self) -> "NbModelClient":
189192
async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:
190193
await self.stop()
191194

195+
192196
async def run(self) -> None:
193197
"""Run the nbmodel client."""
194198
self._log.info("Starting the nbmodel client…")
@@ -256,48 +260,62 @@ async def run(self) -> None:
256260
finally:
257261
self._log.info("Stopping the nbmodel client…")
258262

259-
# Stop listening to incoming messages
263+
# Stop listening to incoming messages.
264+
self._log.debug("Stopping listening to incoming messages…")
260265
if listener.cancel():
261266
await asyncio.wait([listener])
262267

263268
# Stop listening for awareness updates
269+
self._log.debug("Stopping listening for awareness update…")
264270
cast(Awareness, self._doc.awareness).unobserve(awareness_observer)
265271

266272
# Stop listening for document changes
273+
self._log.debug("Stopping listening for document changes…")
267274
try:
268275
self._doc.ydoc.unobserve(doc_observer)
269276
except ValueError as e:
270277
if str(e) != "list.remove(x): x not in list":
271278
self._log.error("Failed to unobserve the notebook model.", exc_info=e)
272279

273280
# Try to propagate the last changes
281+
self._log.debug("Trying to propagate the last changes…")
274282
if not sender.done():
283+
# TODO This is not working as expected, the messages are not sent and hangs indefinitely.
284+
# This is probably due to the fact that the sender task is not awaited.
285+
# We should probably use a timeout here to avoid hanging indefinitely.
286+
# If the sender is not done, we wait for the messages queue to be empty.
287+
# If the messages queue is not empty, we wait for the sender to finish sending the messages.
288+
# This is to ensure that all the changes are propagated before closing the websocket.
275289
if not messages_queue.empty():
276-
self._log.debug("Propagating the %s last changes…", messages_queue.qsize())
277-
await asyncio.shield(messages_queue.join())
290+
self._log.warning("Propagation disabled for now - Propagating the %s last changes…", messages_queue.qsize())
291+
# await asyncio.shield(messages_queue.join())
278292

279293
# Stop forwarding changes
294+
self._log.debug("Stopping forwarding changes…")
280295
if sender.cancel():
281-
self._log.debug("Stop forwarding changes…")
296+
self._log.debug("Stopping forwarding changes…")
282297
await asyncio.wait([sender])
283298

284299
# Reset the model
300+
self._log.debug("Resetting the model…")
285301
self._reset_y_model()
286302
self.__synced.clear()
287303

288304
# Close the websocket
305+
self._log.debug("Closing the websocket…")
289306
if websocket:
290307
try:
291308
await websocket.close()
292309
except BaseException as e:
293310
self._log.error("Unable to close the websocket connection.", exc_info=e)
294311
raise
295312
finally:
296-
self._log.debug("Websocket connection closed.")
313+
self._log.info("Websocket connection closed.")
297314
websocket = None
298315

299316
self.__is_running = False
300317

318+
301319
def get_local_client_id(self) -> int:
302320
"""Get the local client ID.
303321
@@ -338,6 +356,7 @@ def set_local_state_field(self, key: str, value: Any) -> None:
338356
"""
339357
cast(Awareness, self._doc.awareness).set_local_state_field(key, value)
340358

359+
341360
async def start(self) -> None:
342361
"""Start the nbmodel client."""
343362
if self.__run is not None:
@@ -358,21 +377,19 @@ def callback(_: asyncio.Task) -> None:
358377
if not self.synced:
359378
self._log.warning("Document %s not yet synced.", self._path)
360379

380+
361381
async def stop(self) -> None:
362382
"""Stop and reset the nbmodel client."""
363383
if self.__run is not None:
364384
if self.__run.cancel():
365-
# TODO without timeout, stop() sometimes hangs indefinitely.
366-
# try:
367-
# await asyncio.wait_for(self.__run, timeout=1.0)
368-
# except TimeoutError:
369-
# self._log.warning('Timeout with stopping the nbmodel client "%s".', self._path)
370385
await asyncio.wait([self.__run])
371386

387+
372388
async def wait_until_synced(self) -> None:
373389
"""Wait until the model is synced."""
374390
await self.__synced.wait()
375391

392+
376393
async def _on_message(self, websocket: ClientConnection, message: bytes) -> None:
377394
if message[0] == YMessageType.SYNC:
378395
self._log.debug(

0 commit comments

Comments
 (0)