Skip to content

Commit

Permalink
sync: feat: syncing etag and overall improvement
Browse files Browse the repository at this point in the history
* chore: don't log the access token from google.

* fix: same device sync.

When same device is initiating the sync just update the remote that.

* refactor: throw early.

When there is network failure or any sort during downloading just throw exception and stop syncing.

* refactor(gdrive): stream the json.

People with over 3k library can't sync because we are hitting OOM ```java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Failed to allocate a 370950192 byte allocation with 25165824 free bytes and 281MB until OOM, target footprint 333990992, growth limit 603979776```. This should fix that for them but only gdrive.

* feat: a demo for sync with new api

* refactor: perform early null checks

* feat: restore even if push failed

* feat: switch to protobuf

* chore: show error notification when sync fails.

* fix: update order by merge

* fix: call pushSyncData twice

* sync: match sync with latest SY changes and remove specific SY code

Co-Authored-By: KaiserBh <[email protected]>
Co-Authored-By: Cologler <[email protected]>
  • Loading branch information
3 people committed Jun 25, 2024
1 parent b786126 commit b83de39
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 312 deletions.
2 changes: 2 additions & 0 deletions app/src/main/java/eu/kanade/domain/sync/SyncPreferences.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class SyncPreferences(
fun clientAPIKey() = preferenceStore.getString("sync_client_api_key", "")
fun lastSyncTimestamp() = preferenceStore.getLong(Preference.appStateKey("last_sync_timestamp"), 0L)

fun lastSyncEtag() = preferenceStore.getString("sync_etag", "")

fun syncInterval() = preferenceStore.getInt("sync_interval", 0)
fun syncService() = preferenceStore.getInt("sync_service", 0)

Expand Down
92 changes: 54 additions & 38 deletions app/src/main/java/eu/kanade/tachiyomi/data/sync/SyncManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import tachiyomi.domain.manga.model.Manga
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.io.File
import java.io.FileOutputStream
import java.io.IOException
import java.util.Date
import kotlin.system.measureTimeMillis
Expand Down Expand Up @@ -90,16 +89,20 @@ class SyncManager(
sourceSettings = syncOptions.sourceSettings,
privateSettings = syncOptions.privateSettings,
)

logcat(LogPriority.DEBUG) { "Begin create backup" }
val backup = Backup(
backupManga = backupCreator.backupMangas(databaseManga, backupOptions),
backupCategories = backupCreator.backupCategories(backupOptions),
backupSources = backupCreator.backupSources(databaseManga),
backupPreferences = backupCreator.backupAppPreferences(backupOptions),
backupSourcePreferences = backupCreator.backupSourcePreferences(backupOptions),
)
logcat(LogPriority.DEBUG) { "End create backup" }

// Create the SyncData object
val syncData = SyncData(
deviceId = syncPreferences.uniqueDeviceID(),
backup = backup,
)

Expand All @@ -126,8 +129,22 @@ class SyncManager(

val remoteBackup = syncService?.doSync(syncData)

if (remoteBackup == null) {
logcat(LogPriority.DEBUG) { "Skip restore due to network issues" }
// should we call showSyncError?
return
}

if (remoteBackup === syncData.backup){
// nothing changed
logcat(LogPriority.DEBUG) { "Skip restore due to remote was overwrite from local" }
syncPreferences.lastSyncTimestamp().set(Date().time)
notifier.showSyncSuccess("Sync completed successfully")
return
}

// Stop the sync early if the remote backup is null or empty
if (remoteBackup?.backupManga?.size == 0) {
if (remoteBackup.backupManga?.size == 0) {
notifier.showSyncError("No data found on remote server.")
return
}
Expand All @@ -140,52 +157,51 @@ class SyncManager(
return
}

if (remoteBackup != null) {
val (filteredFavorites, nonFavorites) = filterFavoritesAndNonFavorites(remoteBackup)
updateNonFavorites(nonFavorites)
val (filteredFavorites, nonFavorites) = filterFavoritesAndNonFavorites(remoteBackup)
updateNonFavorites(nonFavorites)

val newSyncData = backup.copy(
backupManga = filteredFavorites,
backupCategories = remoteBackup.backupCategories,
backupSources = remoteBackup.backupSources,
backupPreferences = remoteBackup.backupPreferences,
backupSourcePreferences = remoteBackup.backupSourcePreferences,
)
val newSyncData = backup.copy(
backupManga = filteredFavorites,
backupCategories = remoteBackup.backupCategories,
backupSources = remoteBackup.backupSources,
backupPreferences = remoteBackup.backupPreferences,
backupSourcePreferences = remoteBackup.backupSourcePreferences,

// It's local sync no need to restore data. (just update remote data)
if (filteredFavorites.isEmpty()) {
// update the sync timestamp
syncPreferences.lastSyncTimestamp().set(Date().time)
notifier.showSyncSuccess("Sync completed successfully")
return
}
)

val backupUri = writeSyncDataToCache(context, newSyncData)
logcat(LogPriority.DEBUG) { "Got Backup Uri: $backupUri" }
if (backupUri != null) {
BackupRestoreJob.start(
context,
backupUri,
sync = true,
options = RestoreOptions(
appSettings = true,
sourceSettings = true,
library = true,
),
)
// It's local sync no need to restore data. (just update remote data)
if (filteredFavorites.isEmpty()) {
// update the sync timestamp
syncPreferences.lastSyncTimestamp().set(Date().time)
notifier.showSyncSuccess("Sync completed successfully")
return
}

// update the sync timestamp
syncPreferences.lastSyncTimestamp().set(Date().time)
} else {
logcat(LogPriority.ERROR) { "Failed to write sync data to file" }
}
val backupUri = writeSyncDataToCache(context, newSyncData)
logcat(LogPriority.DEBUG) { "Got Backup Uri: $backupUri" }
if (backupUri != null) {
BackupRestoreJob.start(
context,
backupUri,
sync = true,
options = RestoreOptions(
appSettings = true,
sourceSettings = true,
library = true,
),
)

// update the sync timestamp
syncPreferences.lastSyncTimestamp().set(Date().time)
} else {
logcat(LogPriority.ERROR) { "Failed to write sync data to file" }
}
}

private fun writeSyncDataToCache(context: Context, backup: Backup): Uri? {
val cacheFile = File(context.cacheDir, "tachiyomi_sync_data.proto.gz")
return try {
FileOutputStream(cacheFile).use { output ->
cacheFile.outputStream().use { output ->
output.write(ProtoBuf.encodeToByteArray(BackupSerializer, backup))
Uri.fromFile(cacheFile)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,20 @@ import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import com.google.api.client.googleapis.auth.oauth2.GoogleTokenResponse
import com.google.api.client.http.ByteArrayContent
import com.google.api.client.http.InputStreamContent
import com.google.api.client.http.javanet.NetHttpTransport
import com.google.api.client.json.JsonFactory
import com.google.api.client.json.jackson2.JacksonFactory
import com.google.api.services.drive.Drive
import com.google.api.services.drive.DriveScopes
import com.google.api.services.drive.model.File
import eu.kanade.domain.sync.SyncPreferences
import eu.kanade.tachiyomi.data.backup.models.Backup
import kotlinx.coroutines.delay
import kotlinx.serialization.encodeToString
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromStream
import kotlinx.serialization.json.encodeToStream
import logcat.LogPriority
import logcat.logcat
import tachiyomi.core.common.i18n.stringResource
Expand All @@ -29,8 +33,9 @@ import tachiyomi.core.common.util.system.logcat
import tachiyomi.i18n.MR
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.PipedInputStream
import java.io.PipedOutputStream
import java.time.Instant
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream
Expand Down Expand Up @@ -64,7 +69,43 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync

private val googleDriveService = GoogleDriveService(context)

override suspend fun beforeSync() {
override suspend fun doSync(syncData: SyncData): Backup? {
beforeSync()

try {
val remoteSData = pullSyncData()

if (remoteSData != null ){
// Get local unique device ID
val localDeviceId = syncPreferences.uniqueDeviceID()
val lastSyncDeviceId = remoteSData.deviceId

// Log the device IDs
logcat(LogPriority.DEBUG, "SyncService") {
"Local device ID: $localDeviceId, Last sync device ID: $lastSyncDeviceId"
}

// check if the last sync was done by the same device if so overwrite the remote data with the local data
return if (lastSyncDeviceId == localDeviceId) {
pushSyncData(syncData)
syncData.backup
}else{
// Merge the local and remote sync data
val mergedSyncData = mergeSyncData(syncData, remoteSData)
pushSyncData(mergedSyncData)
mergedSyncData.backup
}
}

pushSyncData(syncData)
return syncData.backup
} catch (e: Exception) {
logcat(LogPriority.ERROR, "SyncService") { "Error syncing: ${e.message}" }
return null
}
}

private suspend fun beforeSync() {
try {
googleDriveService.refreshToken()
val drive = googleDriveService.driveService
Expand Down Expand Up @@ -120,13 +161,9 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
}
}

override suspend fun pullSyncData(): SyncData? {
val drive = googleDriveService.driveService

if (drive == null) {
logcat(LogPriority.DEBUG) { "Google Drive service not initialized" }
throw Exception(context.stringResource(MR.strings.google_drive_not_signed_in))
}
private fun pullSyncData(): SyncData? {
val drive = googleDriveService.driveService ?:
throw Exception(context.stringResource(MR.strings.google_drive_not_signed_in))

val fileList = getAppDataFileList(drive)
if (fileList.isEmpty()) {
Expand All @@ -137,75 +174,53 @@ class GoogleDriveSyncService(context: Context, json: Json, syncPreferences: Sync
val gdriveFileId = fileList[0].id
logcat(LogPriority.DEBUG) { "Google Drive File ID: $gdriveFileId" }

val outputStream = ByteArrayOutputStream()
try {
drive.files().get(gdriveFileId).executeMediaAndDownloadTo(outputStream)
logcat(LogPriority.DEBUG) { "File downloaded successfully" }
drive.files().get(gdriveFileId).executeMediaAsInputStream().use { inputStream ->
GZIPInputStream(inputStream).use { gzipInputStream ->
return Json.decodeFromStream(SyncData.serializer(), gzipInputStream)
}
}
} catch (e: Exception) {
logcat(LogPriority.ERROR, throwable = e) { "Error downloading file" }
return null
}

return withIOContext {
try {
val gzipInputStream = GZIPInputStream(outputStream.toByteArray().inputStream())
val jsonString = gzipInputStream.bufferedReader().use { it.readText() }
val syncData = json.decodeFromString(SyncData.serializer(), jsonString)
this@GoogleDriveSyncService.logcat(LogPriority.DEBUG) { "JSON deserialized successfully" }
syncData
} catch (e: Exception) {
this@GoogleDriveSyncService.logcat(
LogPriority.ERROR,
throwable = e,
) { "Failed to convert json to sync data with kotlinx.serialization" }
throw Exception(e.message, e)
}
throw Exception("Failed to download sync data: ${e.message}", e)
}
}

override suspend fun pushSyncData(syncData: SyncData) {
val jsonData = json.encodeToString(syncData)
private suspend fun pushSyncData(syncData: SyncData) {
val drive = googleDriveService.driveService
?: throw Exception(context.stringResource(MR.strings.google_drive_not_signed_in))

val fileList = getAppDataFileList(drive)
val byteArrayOutputStream = ByteArrayOutputStream()
withIOContext {
val gzipOutputStream = GZIPOutputStream(byteArrayOutputStream)
gzipOutputStream.write(jsonData.toByteArray(Charsets.UTF_8))
gzipOutputStream.close()
this@GoogleDriveSyncService.logcat(LogPriority.DEBUG) { "JSON serialized successfully" }
}

val byteArrayContent = ByteArrayContent("application/octet-stream", byteArrayOutputStream.toByteArray())
PipedOutputStream().use { pos ->
PipedInputStream(pos).use { pis ->
withIOContext {
// Start a coroutine or a background thread to write JSON to the PipedOutputStream
launch {
GZIPOutputStream(pos).use { gzipOutputStream ->
Json.encodeToStream(SyncData.serializer(), syncData, gzipOutputStream)
}
}

try {
if (fileList.isNotEmpty()) {
// File exists, so update it
val fileId = fileList[0].id
drive.files().update(fileId, null, byteArrayContent).execute()
logcat(LogPriority.DEBUG) { "Updated existing sync data file in Google Drive with file ID: $fileId" }
} else {
// File doesn't exist, so create it
val fileMetadata = File().apply {
name = remoteFileName
mimeType = "application/gzip"
parents = listOf("appDataFolder")
if (fileList.isNotEmpty()) {
val fileId = fileList[0].id
val mediaContent = InputStreamContent("application/gzip", pis)
drive.files().update(fileId, null, mediaContent).execute()
logcat(LogPriority.DEBUG) { "Updated existing sync data file in Google Drive with file ID: $fileId" }
} else {
val fileMetadata = File().apply {
name = remoteFileName
mimeType = "application/gzip"
parents = listOf("appDataFolder")
}
val mediaContent = InputStreamContent("application/gzip", pis)
val uploadedFile = drive.files().create(fileMetadata, mediaContent)
.setFields("id")
.execute()
logcat(LogPriority.DEBUG) { "Created new sync data file in Google Drive with file ID: ${uploadedFile.id}" }
}
}
val uploadedFile = drive.files().create(fileMetadata, byteArrayContent)
.setFields("id")
.execute()

logcat(
LogPriority.DEBUG,
) { "Created new sync data file in Google Drive with file ID: ${uploadedFile.id}" }
}

// Data has been successfully pushed or updated, delete the lock file
deleteLockFile(drive)
} catch (e: Exception) {
logcat(LogPriority.ERROR, throwable = e) { "Failed to push or update sync data" }
throw Exception(context.stringResource(MR.strings.error_uploading_sync_data) + ": ${e.message}", e)
}
}

Expand Down Expand Up @@ -392,7 +407,6 @@ class GoogleDriveService(private val context: Context) {
}
internal suspend fun refreshToken() = withIOContext {
val refreshToken = syncPreferences.googleDriveRefreshToken().get()
val accessToken = syncPreferences.googleDriveAccessToken().get()

val jsonFactory: JsonFactory = JacksonFactory.getDefaultInstance()
val secrets = GoogleClientSecrets.load(
Expand All @@ -412,16 +426,12 @@ class GoogleDriveService(private val context: Context) {

credential.refreshToken = refreshToken

this@GoogleDriveService.logcat(LogPriority.DEBUG) { "Refreshing access token with: $refreshToken" }

try {
credential.refreshToken()
val newAccessToken = credential.accessToken
// Save the new access token
syncPreferences.googleDriveAccessToken().set(newAccessToken)
setupGoogleDriveService(newAccessToken, credential.refreshToken)
this@GoogleDriveService
.logcat(LogPriority.DEBUG) { "Google Access token refreshed old: $accessToken new: $newAccessToken" }
} catch (e: TokenResponseException) {
if (e.details.error == "invalid_grant") {
// The refresh token is invalid, prompt the user to sign in again
Expand Down Expand Up @@ -473,9 +483,10 @@ class GoogleDriveService(private val context: Context) {
}

/**
* Handles the authorization code returned after the user has granted the application permission to access their Google Drive account.
* It obtains the access token and refresh token using the authorization code, saves the tokens to the SyncPreferences,
* sets up the Google Drive service using the obtained tokens, and initializes the service.
* Handles the authorization code returned after the user has granted the application permission to access their
* Google Drive account.
* It obtains the access token and refresh token using the authorization code, saves the tokens to the
* SyncPreferences, sets up the Google Drive service using the obtained tokens, and initializes the service.
* @param authorizationCode The authorization code obtained from the OAuthCallbackServer.
* @param activity The current activity.
* @param onSuccess A callback function to be called on successful authorization.
Expand Down
Loading

0 comments on commit b83de39

Please sign in to comment.