Use coroutines in GoogleTaskSynchronizer

pull/1051/head
Alex Baker 6 years ago
parent b7b8d1667b
commit 2454757b24

@ -12,8 +12,11 @@ import com.google.api.services.tasks.model.Task
import com.google.api.services.tasks.model.TaskList
import com.google.api.services.tasks.model.TaskLists
import dagger.hilt.android.qualifiers.ApplicationContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.tasks.BuildConfig
import org.tasks.DebugNetworkInterceptor
import org.tasks.data.runBlocking
import org.tasks.gtasks.GoogleAccountManager
import org.tasks.preferences.Preferences
import timber.log.Timber
@ -72,14 +75,13 @@ class GtasksInvoker {
}
@Throws(IOException::class)
fun allGtaskLists(pageToken: String?): TaskLists? {
return execute(service!!.tasklists().list().setMaxResults(100L).setPageToken(pageToken))
}
suspend fun allGtaskLists(pageToken: String?): TaskLists? =
execute(service!!.tasklists().list().setMaxResults(100L).setPageToken(pageToken))
@Throws(IOException::class)
fun getAllGtasksFromListId(
listId: String?, lastSyncDate: Long, pageToken: String?): com.google.api.services.tasks.model.Tasks? {
return execute(
suspend fun getAllGtasksFromListId(
listId: String?, lastSyncDate: Long, pageToken: String?): com.google.api.services.tasks.model.Tasks? =
execute(
service!!
.tasks()
.list(listId)
@ -89,12 +91,11 @@ class GtasksInvoker {
.setPageToken(pageToken)
.setUpdatedMin(
GtasksApiUtilities.unixTimeToGtasksCompletionTime(lastSyncDate).toStringRfc3339()))
}
@Throws(IOException::class)
fun getAllPositions(
listId: String?, pageToken: String?): com.google.api.services.tasks.model.Tasks {
return execute(
suspend fun getAllPositions(
listId: String?, pageToken: String?): com.google.api.services.tasks.model.Tasks =
execute(
service!!
.tasks()
.list(listId)
@ -103,27 +104,28 @@ class GtasksInvoker {
.setShowHidden(false)
.setPageToken(pageToken)
.setFields("items(id,parent,position),nextPageToken"))!!
}
@Throws(IOException::class)
fun createGtask(
listId: String?, task: Task?, parent: String?, previous: String?): Task? {
return execute(service!!.tasks().insert(listId, task).setParent(parent).setPrevious(previous))
}
suspend fun createGtask(
listId: String?, task: Task?, parent: String?, previous: String?): Task? =
execute(service!!.tasks().insert(listId, task).setParent(parent).setPrevious(previous))
@Throws(IOException::class)
fun updateGtask(listId: String?, task: Task) {
execute(service!!.tasks().update(listId, task.id, task))
}
suspend fun updateGtask(listId: String?, task: Task) =
execute(service!!.tasks().update(listId, task.id, task))
@Throws(IOException::class)
fun moveGtask(listId: String?, taskId: String?, parentId: String?, previousId: String?): Task? {
return execute(
service!!.tasks().move(listId, taskId).setParent(parentId).setPrevious(previousId))
}
suspend fun moveGtask(
listId: String?, taskId: String?, parentId: String?, previousId: String?): Task? =
execute(
service!!
.tasks()
.move(listId, taskId)
.setParent(parentId)
.setPrevious(previousId))
@Throws(IOException::class)
fun deleteGtaskList(listId: String?) {
fun deleteGtaskList(listId: String?) = runBlocking {
try {
execute(service!!.tasklists().delete(listId))
} catch (ignored: HttpNotFoundException) {
@ -131,17 +133,17 @@ class GtasksInvoker {
}
@Throws(IOException::class)
fun renameGtaskList(listId: String?, title: String?): TaskList? {
return execute(service!!.tasklists().patch(listId, TaskList().setTitle(title)))
fun renameGtaskList(listId: String?, title: String?): TaskList? = runBlocking {
execute(service!!.tasklists().patch(listId, TaskList().setTitle(title)))
}
@Throws(IOException::class)
fun createGtaskList(title: String?): TaskList? {
return execute(service!!.tasklists().insert(TaskList().setTitle(title)))
fun createGtaskList(title: String?): TaskList? = runBlocking {
execute(service!!.tasklists().insert(TaskList().setTitle(title)))
}
@Throws(IOException::class)
fun deleteGtask(listId: String?, taskId: String?) {
suspend fun deleteGtask(listId: String?, taskId: String?) {
try {
execute(service!!.tasks().delete(listId, taskId))
} catch (ignored: HttpNotFoundException) {
@ -150,36 +152,35 @@ class GtasksInvoker {
@Synchronized
@Throws(IOException::class)
private fun <T> execute(request: TasksRequest<T>): T? {
return execute(request, false)
}
private suspend fun <T> execute(request: TasksRequest<T>): T? = execute(request, false)
@Synchronized
@Throws(IOException::class)
private fun <T> execute(request: TasksRequest<T>, retry: Boolean): T? {
credentialsAdapter!!.checkToken(account, TasksScopes.TASKS)
val response: T?
response = try {
val httpRequest = request.buildHttpRequest()
Timber.d("%s", httpRequest.url)
if (preferences.isFlipperEnabled) {
interceptor.execute(httpRequest, request.responseClass)
} else {
httpRequest.execute().parseAs(request.responseClass)
}
} catch (e: HttpResponseException) {
return if (e.statusCode == 401 && !retry) {
credentialsAdapter.invalidateToken()
execute(request, true)
} else if (e.statusCode == 404) {
throw HttpNotFoundException(e)
} else {
throw e
private suspend fun <T> execute(request: TasksRequest<T>, retry: Boolean): T? =
withContext(Dispatchers.IO) {
credentialsAdapter!!.checkToken(account, TasksScopes.TASKS)
val response: T?
response = try {
val httpRequest = request.buildHttpRequest()
Timber.d("%s", httpRequest.url)
if (preferences.isFlipperEnabled) {
interceptor.execute(httpRequest, request.responseClass)
} else {
httpRequest.execute().parseAs(request.responseClass)
}
} catch (e: HttpResponseException) {
return@withContext if (e.statusCode == 401 && !retry) {
credentialsAdapter.invalidateToken()
execute(request, true)
} else if (e.statusCode == 404) {
throw HttpNotFoundException(e)
} else {
throw e
}
}
Timber.d("%s response: %s", getCaller(retry), prettyPrint(response))
response
}
}
Timber.d("%s response: %s", getCaller(retry), prettyPrint(response))
return response
}
@Throws(IOException::class)
private fun <T> prettyPrint(`object`: T?): Any? {

@ -8,7 +8,7 @@ import com.google.api.services.tasks.model.Tasks
import com.google.common.collect.Lists
import com.todoroo.andlib.utility.DateUtilities
import com.todoroo.astrid.api.GtasksFilter
import com.todoroo.astrid.dao.TaskDaoBlocking
import com.todoroo.astrid.dao.TaskDao
import com.todoroo.astrid.data.Task.Companion.createDueDate
import com.todoroo.astrid.gtasks.GtasksListService
import com.todoroo.astrid.gtasks.api.GtasksApiUtilities
@ -41,12 +41,12 @@ import kotlin.math.max
class GoogleTaskSynchronizer @Inject constructor(
@param:ApplicationContext private val context: Context,
private val googleTaskListDao: GoogleTaskListDaoBlocking,
private val googleTaskListDao: GoogleTaskListDao,
private val gtasksListService: GtasksListService,
private val preferences: Preferences,
private val taskDao: TaskDaoBlocking,
private val taskDao: TaskDao,
private val firebase: Firebase,
private val googleTaskDao: GoogleTaskDaoBlocking,
private val googleTaskDao: GoogleTaskDao,
private val taskCreator: TaskCreator,
private val defaultFilterProvider: DefaultFilterProvider,
private val permissionChecker: PermissionChecker,
@ -55,7 +55,8 @@ class GoogleTaskSynchronizer @Inject constructor(
private val inventory: Inventory,
private val taskDeleter: TaskDeleter,
private val gtasksInvoker: GtasksInvoker) {
fun sync(account: GoogleTaskAccount, i: Int) {
suspend fun sync(account: GoogleTaskAccount, i: Int) {
Timber.d("%s: start sync", account)
try {
if (i == 0 || inventory.hasPro()) {
@ -99,13 +100,13 @@ class GoogleTaskSynchronizer @Inject constructor(
}
@Throws(IOException::class)
private fun synchronize(account: GoogleTaskAccount) {
private suspend fun synchronize(account: GoogleTaskAccount) {
if (!permissionChecker.canAccessAccounts()
|| googleAccountManager.getAccount(account.account) == null) {
account.error = context.getString(R.string.cannot_access_account)
return
}
val gtasksInvoker = gtasksInvoker.forAccount(account.account)
val gtasksInvoker = gtasksInvoker.forAccount(account.account!!)
pushLocalChanges(account, gtasksInvoker)
val gtaskLists: MutableList<TaskList> = ArrayList()
var nextPageToken: String? = null
@ -151,12 +152,12 @@ class GoogleTaskSynchronizer @Inject constructor(
}
@Throws(IOException::class)
private fun fetchPositions(
private suspend fun fetchPositions(
gtasksInvoker: GtasksInvoker, listId: String): List<Task> {
val tasks: MutableList<Task> = ArrayList()
var nextPageToken: String? = null
do {
val taskList = gtasksInvoker.getAllPositions(listId, nextPageToken) ?: break
val taskList = gtasksInvoker.getAllPositions(listId, nextPageToken)
val items = taskList.items
if (items != null) {
tasks.addAll(items)
@ -167,7 +168,7 @@ class GoogleTaskSynchronizer @Inject constructor(
}
@Throws(IOException::class)
private fun pushLocalChanges(account: GoogleTaskAccount, gtasksInvoker: GtasksInvoker) {
private suspend fun pushLocalChanges(account: GoogleTaskAccount, gtasksInvoker: GtasksInvoker) {
val tasks = taskDao.getGoogleTasksToPush(account.account!!)
for (task in tasks) {
pushTask(task, gtasksInvoker)
@ -175,7 +176,7 @@ class GoogleTaskSynchronizer @Inject constructor(
}
@Throws(IOException::class)
private fun pushTask(task: com.todoroo.astrid.data.Task, gtasksInvoker: GtasksInvoker) {
private suspend fun pushTask(task: com.todoroo.astrid.data.Task, gtasksInvoker: GtasksInvoker) {
for (deleted in googleTaskDao.getDeletedByTaskId(task.id)) {
gtasksInvoker.deleteGtask(deleted.listId, deleted.remoteId)
googleTaskDao.delete(deleted)
@ -284,7 +285,7 @@ class GoogleTaskSynchronizer @Inject constructor(
@Synchronized
@Throws(IOException::class)
private fun fetchAndApplyRemoteChanges(
private suspend fun fetchAndApplyRemoteChanges(
gtasksInvoker: GtasksInvoker, list: GoogleTaskList) {
val listId = list.remoteId
var lastSyncDate = list.lastSync
@ -312,7 +313,7 @@ class GoogleTaskSynchronizer @Inject constructor(
if (googleTask == null) {
googleTask = GoogleTask(0, "")
} else if (googleTask.task > 0) {
task = taskDao.fetchBlocking(googleTask.task)
task = taskDao.fetch(googleTask.task)
}
val updated = gtask.updated
if (updated != null) {
@ -358,7 +359,7 @@ class GoogleTaskSynchronizer @Inject constructor(
googleTaskListDao.insertOrReplace(list)
}
private fun write(task: com.todoroo.astrid.data.Task?, googleTask: GoogleTask) {
private suspend fun write(task: com.todoroo.astrid.data.Task?, googleTask: GoogleTask) {
if (!(isNullOrEmpty(task!!.title) && isNullOrEmpty(task.notes))) {
task.suppressSync()
task.suppressRefresh()

@ -4,6 +4,9 @@ import android.content.Context
import androidx.hilt.Assisted
import androidx.hilt.work.WorkerInject
import androidx.work.WorkerParameters
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import org.tasks.LocalBroadcastManager
import org.tasks.analytics.Firebase
import org.tasks.caldav.CaldavSynchronizer
@ -14,8 +17,6 @@ import org.tasks.gtasks.GoogleTaskSynchronizer
import org.tasks.injection.BaseWorker
import org.tasks.preferences.Preferences
import org.tasks.sync.SyncAdapters
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
class SyncWork @WorkerInject constructor(
@Assisted context: Context,
@ -53,24 +54,27 @@ class SyncWork @WorkerInject constructor(
}
@Throws(InterruptedException::class)
private suspend fun sync() {
val numThreads = Runtime.getRuntime().availableProcessors()
val executor = Executors.newFixedThreadPool(numThreads)
for (account in caldavDao.getAccounts()) {
executor.execute {
if (account.isCaldavAccount) {
caldavSynchronizer.sync(account)
} else if (account.isEteSyncAccount) {
eteSynchronizer.sync(account)
private suspend fun sync() = coroutineScope {
val deferredCaldav = caldavDao.getAccounts()
.map {
async(Dispatchers.IO) {
if (it.isCaldavAccount) {
caldavSynchronizer.sync(it)
} else if (it.isEteSyncAccount) {
eteSynchronizer.sync(it)
}
}
}
}
}
val accounts = googleTaskListDao.getAccounts()
for (i in accounts.indices) {
executor.execute { googleTaskSynchronizer.sync(accounts[i], i) }
}
executor.shutdown()
executor.awaitTermination(15, TimeUnit.MINUTES)
val deferredGoogleTasks = googleTaskListDao
.getAccounts()
.mapIndexed { i, account ->
async(Dispatchers.IO) {
googleTaskSynchronizer.sync(account, i)
}
}
deferredCaldav
.plus(deferredGoogleTasks)
.forEach { it.await() }
}
companion object {

Loading…
Cancel
Save