From 93fcc3b8052ae5de8cb69760e87caef3d7fb48a8 Mon Sep 17 00:00:00 2001
From: Brian Vaughn <brian.david.vaughn@gmail.com>
Date: Tue, 26 Sep 2023 21:07:59 -0400
Subject: [PATCH] Fix edge case bug in  that sometimes caused updates to be
 dropped

---
 .../src/hooks/useStreamingValue.test.tsx      | 19 ++-----
 .../suspense/src/hooks/useStreamingValue.ts   | 50 ++-----------------
 2 files changed, 8 insertions(+), 61 deletions(-)

diff --git a/packages/suspense/src/hooks/useStreamingValue.test.tsx b/packages/suspense/src/hooks/useStreamingValue.test.tsx
index 196ab9a..fbcbef2 100644
--- a/packages/suspense/src/hooks/useStreamingValue.test.tsx
+++ b/packages/suspense/src/hooks/useStreamingValue.test.tsx
@@ -22,11 +22,9 @@ describe("useStreamingValue", () => {
   let lastRendered: StreamingValuePartial<any, any> | undefined = undefined;
 
   function Component({
-    simulateRenderDuration = 0,
     streaming,
     throttleUpdatesBy,
   }: {
-    simulateRenderDuration?: number;
     streaming: StreamingValue<any, any>;
     throttleUpdatesBy?: number;
   }): any {
@@ -34,10 +32,6 @@ describe("useStreamingValue", () => {
       throttleUpdatesBy,
     });
 
-    if (simulateRenderDuration > 0) {
-      jest.advanceTimersByTime(simulateRenderDuration);
-    }
-
     return null;
   }
 
@@ -164,16 +158,9 @@ describe("useStreamingValue", () => {
       const container = document.createElement("div");
       const root = createRoot(container);
 
-      // Simulate a render that takes longer than the throttle-by duration.
-      // This ensures that the throttling respects commit boundaries
-      // to avoid overwhelming the scheduler.
       act(() => {
         root.render(
-          <Component
-            simulateRenderDuration={500}
-            streaming={streaming}
-            throttleUpdatesBy={100}
-          />
+          <Component streaming={streaming} throttleUpdatesBy={100} />
         );
       });
 
@@ -190,7 +177,9 @@ describe("useStreamingValue", () => {
       });
       expect(lastRendered?.value).toEqual([1]);
 
-      jest.advanceTimersByTime(50);
+      act(() => {
+        jest.advanceTimersByTime(50);
+      });
 
       act(() => {
         options.update([1, 2, 3], 0.75);
diff --git a/packages/suspense/src/hooks/useStreamingValue.ts b/packages/suspense/src/hooks/useStreamingValue.ts
index 061fe2a..1e7a40c 100644
--- a/packages/suspense/src/hooks/useStreamingValue.ts
+++ b/packages/suspense/src/hooks/useStreamingValue.ts
@@ -1,23 +1,19 @@
-import { useCallback, useEffect, useRef, useSyncExternalStore } from "react";
+import { useCallback, useRef, useSyncExternalStore } from "react";
 import { STATUS_PENDING } from "../constants";
 
 import { StreamingValue } from "../types";
+import { throttle } from "../utils/throttle";
 
 export type StreamingValuePartial<Value, AdditionalData> = Pick<
   StreamingValue<Value, AdditionalData>,
   "complete" | "data" | "error" | "progress" | "status" | "value"
 >;
 
-type Noop = () => void;
-type CallbackWrapper = Noop & { hold: Noop; release: Noop };
-
 export function useStreamingValue<Value, AdditionalData = undefined>(
   streamingValues: StreamingValue<Value, AdditionalData>,
   options: { throttleUpdatesBy?: number } = {}
 ): StreamingValuePartial<Value, AdditionalData> {
-  const { throttleUpdatesBy = 100 } = options;
-
-  const callbackWrapperRef = useRef<CallbackWrapper | null>(null);
+  const { throttleUpdatesBy = 150 } = options;
 
   const ref = useRef<StreamingValuePartial<Value, AdditionalData>>({
     complete: false,
@@ -54,54 +50,16 @@ export function useStreamingValue<Value, AdditionalData = undefined>(
     (callback: () => void) => {
       const callbackWrapper = throttle(() => {
         callback();
-        callbackWrapper.hold();
-      });
-
-      callbackWrapperRef.current = callbackWrapper;
+      }, throttleUpdatesBy);
 
       return streamingValues.subscribe(callbackWrapper);
     },
     [streamingValues.subscribe]
   );
 
-  useEffect(() => {
-    const callbackWrapper = callbackWrapperRef.current;
-    if (callbackWrapper) {
-      setTimeout(callbackWrapper.release, throttleUpdatesBy);
-    }
-  });
-
   return useSyncExternalStore<StreamingValuePartial<Value, AdditionalData>>(
     throttledSubscribe,
     getValue,
     getValue
   );
 }
-
-function throttle(callback: Noop): CallbackWrapper {
-  let hold = false;
-  let pending = false;
-
-  const throttled = () => {
-    if (hold) {
-      pending = true;
-    } else {
-      callback();
-    }
-  };
-
-  throttled.hold = () => {
-    hold = true;
-  };
-
-  throttled.release = () => {
-    hold = false;
-
-    if (pending) {
-      pending = false;
-      callback();
-    }
-  };
-
-  return throttled;
-}