Skip to content

Commit

Permalink
Updated logic to ensure handleQueue is called.
Browse files Browse the repository at this point in the history
  • Loading branch information
mohitkumaragarwal committed Nov 5, 2024
1 parent 7187fd0 commit 4d5f08f
Showing 1 changed file with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json
import asyncio
import queue
from threading import Thread
from Python.Serialize import serializeClassInstance, serializeValue

class Short:
Expand All @@ -23,18 +21,25 @@ class ResultEventHandler:
def __init__(self, websocket, server_url):
self.websocket = websocket
self.server_url = server_url
self.queue = queue.Queue()
self.queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
self.loop.create_task(self.handleQueue())

async def handleQueue(self):
while True:
item = await self.loop.run_in_executor(None, self.queue.get)
print("handleQueue Running...")
item = await self.queue.get()
if item is None:
break
await self.websocket.send(item)
try:
await self.websocket.send(item)
except websockets.exceptions.ConnectionClosedOK:
print("WebSocket connection closed normally.")
break
except Exception as e:
print(f"Error sending message: {e}")

async def threaded_websocket(self, tmp):
async def process_event(self, tmp):
stage = "1"
try:
arg = str(serializeValue(tmp))
Expand All @@ -47,15 +52,12 @@ async def threaded_websocket(self, tmp):
tmp2 = json.dumps(returnValue)
stage = "3"
print("start " + tmp.Message.Text)
self.queue.put(tmp2)
await self.queue.put(tmp2)
stage = "4"
print("end " + tmp.Message.Text)
except Exception as e:
print("--- TW Exception: " + stage + " --- " + str(e))

def wrap_async_func(self, arg):
asyncio.run_coroutine_threadsafe(self.threaded_websocket(arg), self.loop)

def event_notification(self, event):
print("RESULT EVENT RECEIVED: " + event.Message.Text)

Expand All @@ -65,4 +67,14 @@ def event_notification(self, event):
# to the webpage needs to be done asynchronously via a separate thread.
# Therefore we ensure we send JUST the result part (and the eventType for identification, and the message)

self.wrap_async_func(filteredEvent)
asyncio.create_task(self.process_event(filteredEvent))

# Ensure the event loop is running
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()

0 comments on commit 4d5f08f

Please sign in to comment.