From 2454757b24fe4e08f75fad79854eb65cd03b5d82 Mon Sep 17 00:00:00 2001 From: Alex Baker Date: Mon, 13 Jul 2020 14:13:22 -0500 Subject: [PATCH] Use coroutines in GoogleTaskSynchronizer --- .../astrid/gtasks/api/GtasksInvoker.kt | 109 +++++++++--------- .../tasks/gtasks/GoogleTaskSynchronizer.kt | 29 ++--- app/src/main/java/org/tasks/jobs/SyncWork.kt | 42 ++++--- 3 files changed, 93 insertions(+), 87 deletions(-) diff --git a/app/src/main/java/com/todoroo/astrid/gtasks/api/GtasksInvoker.kt b/app/src/main/java/com/todoroo/astrid/gtasks/api/GtasksInvoker.kt index 727d17b07..aa0eb8185 100644 --- a/app/src/main/java/com/todoroo/astrid/gtasks/api/GtasksInvoker.kt +++ b/app/src/main/java/com/todoroo/astrid/gtasks/api/GtasksInvoker.kt @@ -12,8 +12,11 @@ import com.google.api.services.tasks.model.Task import com.google.api.services.tasks.model.TaskList import com.google.api.services.tasks.model.TaskLists import dagger.hilt.android.qualifiers.ApplicationContext +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.withContext import org.tasks.BuildConfig import org.tasks.DebugNetworkInterceptor +import org.tasks.data.runBlocking import org.tasks.gtasks.GoogleAccountManager import org.tasks.preferences.Preferences import timber.log.Timber @@ -72,14 +75,13 @@ class GtasksInvoker { } @Throws(IOException::class) - fun allGtaskLists(pageToken: String?): TaskLists? { - return execute(service!!.tasklists().list().setMaxResults(100L).setPageToken(pageToken)) - } + suspend fun allGtaskLists(pageToken: String?): TaskLists? = + execute(service!!.tasklists().list().setMaxResults(100L).setPageToken(pageToken)) @Throws(IOException::class) - fun getAllGtasksFromListId( - listId: String?, lastSyncDate: Long, pageToken: String?): com.google.api.services.tasks.model.Tasks? { - return execute( + suspend fun getAllGtasksFromListId( + listId: String?, lastSyncDate: Long, pageToken: String?): com.google.api.services.tasks.model.Tasks? = + execute( service!! .tasks() .list(listId) @@ -89,12 +91,11 @@ class GtasksInvoker { .setPageToken(pageToken) .setUpdatedMin( GtasksApiUtilities.unixTimeToGtasksCompletionTime(lastSyncDate).toStringRfc3339())) - } @Throws(IOException::class) - fun getAllPositions( - listId: String?, pageToken: String?): com.google.api.services.tasks.model.Tasks { - return execute( + suspend fun getAllPositions( + listId: String?, pageToken: String?): com.google.api.services.tasks.model.Tasks = + execute( service!! .tasks() .list(listId) @@ -103,27 +104,28 @@ class GtasksInvoker { .setShowHidden(false) .setPageToken(pageToken) .setFields("items(id,parent,position),nextPageToken"))!! - } @Throws(IOException::class) - fun createGtask( - listId: String?, task: Task?, parent: String?, previous: String?): Task? { - return execute(service!!.tasks().insert(listId, task).setParent(parent).setPrevious(previous)) - } + suspend fun createGtask( + listId: String?, task: Task?, parent: String?, previous: String?): Task? = + execute(service!!.tasks().insert(listId, task).setParent(parent).setPrevious(previous)) @Throws(IOException::class) - fun updateGtask(listId: String?, task: Task) { - execute(service!!.tasks().update(listId, task.id, task)) - } + suspend fun updateGtask(listId: String?, task: Task) = + execute(service!!.tasks().update(listId, task.id, task)) @Throws(IOException::class) - fun moveGtask(listId: String?, taskId: String?, parentId: String?, previousId: String?): Task? { - return execute( - service!!.tasks().move(listId, taskId).setParent(parentId).setPrevious(previousId)) - } + suspend fun moveGtask( + listId: String?, taskId: String?, parentId: String?, previousId: String?): Task? = + execute( + service!! + .tasks() + .move(listId, taskId) + .setParent(parentId) + .setPrevious(previousId)) @Throws(IOException::class) - fun deleteGtaskList(listId: String?) { + fun deleteGtaskList(listId: String?) = runBlocking { try { execute(service!!.tasklists().delete(listId)) } catch (ignored: HttpNotFoundException) { @@ -131,17 +133,17 @@ class GtasksInvoker { } @Throws(IOException::class) - fun renameGtaskList(listId: String?, title: String?): TaskList? { - return execute(service!!.tasklists().patch(listId, TaskList().setTitle(title))) + fun renameGtaskList(listId: String?, title: String?): TaskList? = runBlocking { + execute(service!!.tasklists().patch(listId, TaskList().setTitle(title))) } @Throws(IOException::class) - fun createGtaskList(title: String?): TaskList? { - return execute(service!!.tasklists().insert(TaskList().setTitle(title))) + fun createGtaskList(title: String?): TaskList? = runBlocking { + execute(service!!.tasklists().insert(TaskList().setTitle(title))) } @Throws(IOException::class) - fun deleteGtask(listId: String?, taskId: String?) { + suspend fun deleteGtask(listId: String?, taskId: String?) { try { execute(service!!.tasks().delete(listId, taskId)) } catch (ignored: HttpNotFoundException) { @@ -150,36 +152,35 @@ class GtasksInvoker { @Synchronized @Throws(IOException::class) - private fun execute(request: TasksRequest): T? { - return execute(request, false) - } + private suspend fun execute(request: TasksRequest): T? = execute(request, false) @Synchronized @Throws(IOException::class) - private fun execute(request: TasksRequest, retry: Boolean): T? { - credentialsAdapter!!.checkToken(account, TasksScopes.TASKS) - val response: T? - response = try { - val httpRequest = request.buildHttpRequest() - Timber.d("%s", httpRequest.url) - if (preferences.isFlipperEnabled) { - interceptor.execute(httpRequest, request.responseClass) - } else { - httpRequest.execute().parseAs(request.responseClass) - } - } catch (e: HttpResponseException) { - return if (e.statusCode == 401 && !retry) { - credentialsAdapter.invalidateToken() - execute(request, true) - } else if (e.statusCode == 404) { - throw HttpNotFoundException(e) - } else { - throw e + private suspend fun execute(request: TasksRequest, retry: Boolean): T? = + withContext(Dispatchers.IO) { + credentialsAdapter!!.checkToken(account, TasksScopes.TASKS) + val response: T? + response = try { + val httpRequest = request.buildHttpRequest() + Timber.d("%s", httpRequest.url) + if (preferences.isFlipperEnabled) { + interceptor.execute(httpRequest, request.responseClass) + } else { + httpRequest.execute().parseAs(request.responseClass) + } + } catch (e: HttpResponseException) { + return@withContext if (e.statusCode == 401 && !retry) { + credentialsAdapter.invalidateToken() + execute(request, true) + } else if (e.statusCode == 404) { + throw HttpNotFoundException(e) + } else { + throw e + } + } + Timber.d("%s response: %s", getCaller(retry), prettyPrint(response)) + response } - } - Timber.d("%s response: %s", getCaller(retry), prettyPrint(response)) - return response - } @Throws(IOException::class) private fun prettyPrint(`object`: T?): Any? { diff --git a/app/src/main/java/org/tasks/gtasks/GoogleTaskSynchronizer.kt b/app/src/main/java/org/tasks/gtasks/GoogleTaskSynchronizer.kt index 1ba292304..80974146c 100644 --- a/app/src/main/java/org/tasks/gtasks/GoogleTaskSynchronizer.kt +++ b/app/src/main/java/org/tasks/gtasks/GoogleTaskSynchronizer.kt @@ -8,7 +8,7 @@ import com.google.api.services.tasks.model.Tasks import com.google.common.collect.Lists import com.todoroo.andlib.utility.DateUtilities import com.todoroo.astrid.api.GtasksFilter -import com.todoroo.astrid.dao.TaskDaoBlocking +import com.todoroo.astrid.dao.TaskDao import com.todoroo.astrid.data.Task.Companion.createDueDate import com.todoroo.astrid.gtasks.GtasksListService import com.todoroo.astrid.gtasks.api.GtasksApiUtilities @@ -41,12 +41,12 @@ import kotlin.math.max class GoogleTaskSynchronizer @Inject constructor( @param:ApplicationContext private val context: Context, - private val googleTaskListDao: GoogleTaskListDaoBlocking, + private val googleTaskListDao: GoogleTaskListDao, private val gtasksListService: GtasksListService, private val preferences: Preferences, - private val taskDao: TaskDaoBlocking, + private val taskDao: TaskDao, private val firebase: Firebase, - private val googleTaskDao: GoogleTaskDaoBlocking, + private val googleTaskDao: GoogleTaskDao, private val taskCreator: TaskCreator, private val defaultFilterProvider: DefaultFilterProvider, private val permissionChecker: PermissionChecker, @@ -55,7 +55,8 @@ class GoogleTaskSynchronizer @Inject constructor( private val inventory: Inventory, private val taskDeleter: TaskDeleter, private val gtasksInvoker: GtasksInvoker) { - fun sync(account: GoogleTaskAccount, i: Int) { + + suspend fun sync(account: GoogleTaskAccount, i: Int) { Timber.d("%s: start sync", account) try { if (i == 0 || inventory.hasPro()) { @@ -99,13 +100,13 @@ class GoogleTaskSynchronizer @Inject constructor( } @Throws(IOException::class) - private fun synchronize(account: GoogleTaskAccount) { + private suspend fun synchronize(account: GoogleTaskAccount) { if (!permissionChecker.canAccessAccounts() || googleAccountManager.getAccount(account.account) == null) { account.error = context.getString(R.string.cannot_access_account) return } - val gtasksInvoker = gtasksInvoker.forAccount(account.account) + val gtasksInvoker = gtasksInvoker.forAccount(account.account!!) pushLocalChanges(account, gtasksInvoker) val gtaskLists: MutableList = ArrayList() var nextPageToken: String? = null @@ -151,12 +152,12 @@ class GoogleTaskSynchronizer @Inject constructor( } @Throws(IOException::class) - private fun fetchPositions( + private suspend fun fetchPositions( gtasksInvoker: GtasksInvoker, listId: String): List { val tasks: MutableList = ArrayList() var nextPageToken: String? = null do { - val taskList = gtasksInvoker.getAllPositions(listId, nextPageToken) ?: break + val taskList = gtasksInvoker.getAllPositions(listId, nextPageToken) val items = taskList.items if (items != null) { tasks.addAll(items) @@ -167,7 +168,7 @@ class GoogleTaskSynchronizer @Inject constructor( } @Throws(IOException::class) - private fun pushLocalChanges(account: GoogleTaskAccount, gtasksInvoker: GtasksInvoker) { + private suspend fun pushLocalChanges(account: GoogleTaskAccount, gtasksInvoker: GtasksInvoker) { val tasks = taskDao.getGoogleTasksToPush(account.account!!) for (task in tasks) { pushTask(task, gtasksInvoker) @@ -175,7 +176,7 @@ class GoogleTaskSynchronizer @Inject constructor( } @Throws(IOException::class) - private fun pushTask(task: com.todoroo.astrid.data.Task, gtasksInvoker: GtasksInvoker) { + private suspend fun pushTask(task: com.todoroo.astrid.data.Task, gtasksInvoker: GtasksInvoker) { for (deleted in googleTaskDao.getDeletedByTaskId(task.id)) { gtasksInvoker.deleteGtask(deleted.listId, deleted.remoteId) googleTaskDao.delete(deleted) @@ -284,7 +285,7 @@ class GoogleTaskSynchronizer @Inject constructor( @Synchronized @Throws(IOException::class) - private fun fetchAndApplyRemoteChanges( + private suspend fun fetchAndApplyRemoteChanges( gtasksInvoker: GtasksInvoker, list: GoogleTaskList) { val listId = list.remoteId var lastSyncDate = list.lastSync @@ -312,7 +313,7 @@ class GoogleTaskSynchronizer @Inject constructor( if (googleTask == null) { googleTask = GoogleTask(0, "") } else if (googleTask.task > 0) { - task = taskDao.fetchBlocking(googleTask.task) + task = taskDao.fetch(googleTask.task) } val updated = gtask.updated if (updated != null) { @@ -358,7 +359,7 @@ class GoogleTaskSynchronizer @Inject constructor( googleTaskListDao.insertOrReplace(list) } - private fun write(task: com.todoroo.astrid.data.Task?, googleTask: GoogleTask) { + private suspend fun write(task: com.todoroo.astrid.data.Task?, googleTask: GoogleTask) { if (!(isNullOrEmpty(task!!.title) && isNullOrEmpty(task.notes))) { task.suppressSync() task.suppressRefresh() diff --git a/app/src/main/java/org/tasks/jobs/SyncWork.kt b/app/src/main/java/org/tasks/jobs/SyncWork.kt index b950e122d..6f99c3740 100644 --- a/app/src/main/java/org/tasks/jobs/SyncWork.kt +++ b/app/src/main/java/org/tasks/jobs/SyncWork.kt @@ -4,6 +4,9 @@ import android.content.Context import androidx.hilt.Assisted import androidx.hilt.work.WorkerInject import androidx.work.WorkerParameters +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope import org.tasks.LocalBroadcastManager import org.tasks.analytics.Firebase import org.tasks.caldav.CaldavSynchronizer @@ -14,8 +17,6 @@ import org.tasks.gtasks.GoogleTaskSynchronizer import org.tasks.injection.BaseWorker import org.tasks.preferences.Preferences import org.tasks.sync.SyncAdapters -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit class SyncWork @WorkerInject constructor( @Assisted context: Context, @@ -53,24 +54,27 @@ class SyncWork @WorkerInject constructor( } @Throws(InterruptedException::class) - private suspend fun sync() { - val numThreads = Runtime.getRuntime().availableProcessors() - val executor = Executors.newFixedThreadPool(numThreads) - for (account in caldavDao.getAccounts()) { - executor.execute { - if (account.isCaldavAccount) { - caldavSynchronizer.sync(account) - } else if (account.isEteSyncAccount) { - eteSynchronizer.sync(account) + private suspend fun sync() = coroutineScope { + val deferredCaldav = caldavDao.getAccounts() + .map { + async(Dispatchers.IO) { + if (it.isCaldavAccount) { + caldavSynchronizer.sync(it) + } else if (it.isEteSyncAccount) { + eteSynchronizer.sync(it) + } + } } - } - } - val accounts = googleTaskListDao.getAccounts() - for (i in accounts.indices) { - executor.execute { googleTaskSynchronizer.sync(accounts[i], i) } - } - executor.shutdown() - executor.awaitTermination(15, TimeUnit.MINUTES) + val deferredGoogleTasks = googleTaskListDao + .getAccounts() + .mapIndexed { i, account -> + async(Dispatchers.IO) { + googleTaskSynchronizer.sync(account, i) + } + } + deferredCaldav + .plus(deferredGoogleTasks) + .forEach { it.await() } } companion object {