Throttle WorkManager calls to 200/min

pull/1061/head
Alex Baker 4 years ago
parent 1659afb34e
commit 640412314c

@ -4,9 +4,4 @@ object DbUtils {
const val MAX_SQLITE_ARGS = 990 const val MAX_SQLITE_ARGS = 990
fun <T> Iterable<T>.dbchunk(): List<List<T>> = chunked(MAX_SQLITE_ARGS) fun <T> Iterable<T>.dbchunk(): List<List<T>> = chunked(MAX_SQLITE_ARGS)
fun <T> Iterable<T>.eachChunk(action: (List<T>) -> Unit) = dbchunk().forEach(action)
fun <T, R> Iterable<T>.chunkedMap(transform: (List<T>) -> Iterable<R>): List<R> =
dbchunk().flatMap(transform)
} }

@ -1,10 +1,14 @@
package org.tasks.db package org.tasks.db
import org.tasks.db.DbUtils.MAX_SQLITE_ARGS
import org.tasks.db.DbUtils.dbchunk import org.tasks.db.DbUtils.dbchunk
object SuspendDbUtils { object SuspendDbUtils {
suspend fun <T> Iterable<T>.eachChunk(action: suspend (List<T>) -> Unit) = suspend fun <T> Iterable<T>.eachChunk(action: suspend (List<T>) -> Unit) =
dbchunk().forEach { action.invoke(it) } eachChunk(MAX_SQLITE_ARGS, action)
suspend fun <T> Iterable<T>.eachChunk(size: Int, action: suspend (List<T>) -> Unit) =
chunked(size).forEach { action.invoke(it) }
suspend fun <T, R> Iterable<T>.chunkedMap(transform: suspend (List<T>) -> Iterable<R>): List<R> = suspend fun <T, R> Iterable<T>.chunkedMap(transform: suspend (List<T>) -> Iterable<R>): List<R> =
dbchunk().flatMap { transform.invoke(it) } dbchunk().flatMap { transform.invoke(it) }

@ -30,7 +30,7 @@ class BackupWork @WorkerInject constructor(
return Result.success() return Result.success()
} }
override fun scheduleNext() = workManager.scheduleBackup() override suspend fun scheduleNext() = workManager.scheduleBackup()
private suspend fun startBackup(context: Context?) { private suspend fun startBackup(context: Context?) {
try { try {

@ -19,5 +19,5 @@ class MidnightRefreshWork @WorkerInject constructor(
return Result.success() return Result.success()
} }
override fun scheduleNext() = workManager.scheduleMidnightRefresh() override suspend fun scheduleNext() = workManager.scheduleMidnightRefresh()
} }

@ -20,5 +20,5 @@ class RefreshWork @WorkerInject constructor(
return Result.success() return Result.success()
} }
override fun scheduleNext() = refreshScheduler.scheduleNext() override suspend fun scheduleNext() = refreshScheduler.scheduleNext()
} }

@ -16,5 +16,5 @@ abstract class RepeatingWorker internal constructor(
return result return result
} }
protected abstract fun scheduleNext() protected abstract suspend fun scheduleNext()
} }

@ -7,32 +7,32 @@ import org.tasks.data.Place
interface WorkManager { interface WorkManager {
fun afterComplete(current: Task, original: Task?) suspend fun afterComplete(task: Task)
fun cleanup(ids: Iterable<Long>) suspend fun cleanup(ids: Iterable<Long>)
fun sync(immediate: Boolean) suspend fun sync(immediate: Boolean)
fun reverseGeocode(place: Place) suspend fun reverseGeocode(place: Place)
fun updateBackgroundSync() suspend fun updateBackgroundSync()
suspend fun updateBackgroundSync( suspend fun updateBackgroundSync(
forceAccountPresent: Boolean?, forceAccountPresent: Boolean?,
forceBackgroundEnabled: Boolean?, forceBackgroundEnabled: Boolean?,
forceOnlyOnUnmetered: Boolean?) forceOnlyOnUnmetered: Boolean?)
fun scheduleRefresh(time: Long) suspend fun scheduleRefresh(time: Long)
fun scheduleMidnightRefresh() suspend fun scheduleMidnightRefresh()
fun scheduleNotification(scheduledTime: Long) fun scheduleNotification(scheduledTime: Long)
fun scheduleBackup() suspend fun scheduleBackup()
fun scheduleConfigRefresh() suspend fun scheduleConfigRefresh()
fun scheduleDriveUpload(uri: Uri, purge: Boolean) suspend fun scheduleDriveUpload(uri: Uri, purge: Boolean)
fun cancelNotifications() fun cancelNotifications()

@ -15,9 +15,9 @@ import org.tasks.R
import org.tasks.data.CaldavDao import org.tasks.data.CaldavDao
import org.tasks.data.GoogleTaskListDao import org.tasks.data.GoogleTaskListDao
import org.tasks.data.Place import org.tasks.data.Place
import org.tasks.data.runBlocking
import org.tasks.date.DateTimeUtils.midnight import org.tasks.date.DateTimeUtils.midnight
import org.tasks.date.DateTimeUtils.newDateTime import org.tasks.date.DateTimeUtils.newDateTime
import org.tasks.db.SuspendDbUtils.eachChunk
import org.tasks.jobs.WorkManager.Companion.MAX_CLEANUP_LENGTH import org.tasks.jobs.WorkManager.Companion.MAX_CLEANUP_LENGTH
import org.tasks.jobs.WorkManager.Companion.REMOTE_CONFIG_INTERVAL_HOURS import org.tasks.jobs.WorkManager.Companion.REMOTE_CONFIG_INTERVAL_HOURS
import org.tasks.jobs.WorkManager.Companion.TAG_BACKGROUND_SYNC import org.tasks.jobs.WorkManager.Companion.TAG_BACKGROUND_SYNC
@ -26,6 +26,7 @@ import org.tasks.jobs.WorkManager.Companion.TAG_MIDNIGHT_REFRESH
import org.tasks.jobs.WorkManager.Companion.TAG_REFRESH import org.tasks.jobs.WorkManager.Companion.TAG_REFRESH
import org.tasks.jobs.WorkManager.Companion.TAG_REMOTE_CONFIG import org.tasks.jobs.WorkManager.Companion.TAG_REMOTE_CONFIG
import org.tasks.jobs.WorkManager.Companion.TAG_SYNC import org.tasks.jobs.WorkManager.Companion.TAG_SYNC
import org.tasks.notifications.Throttle
import org.tasks.preferences.Preferences import org.tasks.preferences.Preferences
import org.tasks.time.DateTimeUtils import org.tasks.time.DateTimeUtils
import timber.log.Timber import timber.log.Timber
@ -38,32 +39,42 @@ class WorkManagerImpl constructor(
private val preferences: Preferences, private val preferences: Preferences,
private val googleTaskListDao: GoogleTaskListDao, private val googleTaskListDao: GoogleTaskListDao,
private val caldavDao: CaldavDao): WorkManager { private val caldavDao: CaldavDao): WorkManager {
private val throttle = Throttle(200, 60000, "WORK")
private val alarmManager: AlarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager private val alarmManager: AlarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager
private val workManager = androidx.work.WorkManager.getInstance(context) private val workManager = androidx.work.WorkManager.getInstance(context)
override fun afterComplete(current: Task, original: Task?) { private suspend fun enqueue(builder: WorkRequest.Builder<*, *>) {
workManager.enqueue( throttle.run {
workManager.enqueue(builder.build())
}
}
private suspend fun enqueue(continuation: WorkContinuation) {
throttle.run {
continuation.enqueue()
}
}
override suspend fun afterComplete(task: Task) {
enqueue(
OneTimeWorkRequest.Builder(AfterSaveWork::class.java) OneTimeWorkRequest.Builder(AfterSaveWork::class.java)
.setInputData(Data.Builder() .setInputData(Data.Builder()
.putLong(AfterSaveWork.EXTRA_ID, current.id) .putLong(AfterSaveWork.EXTRA_ID, task.id)
.build()) .build()))
.build())
} }
override fun cleanup(ids: Iterable<Long>) { override suspend fun cleanup(ids: Iterable<Long>) {
ids.chunked(MAX_CLEANUP_LENGTH) { ids.eachChunk(MAX_CLEANUP_LENGTH) {
workManager.enqueue( enqueue(
OneTimeWorkRequest.Builder(CleanupWork::class.java) OneTimeWorkRequest.Builder(CleanupWork::class.java)
.setInputData( .setInputData(
Data.Builder() Data.Builder()
.putLongArray(CleanupWork.EXTRA_TASK_IDS, it.toLongArray()) .putLongArray(CleanupWork.EXTRA_TASK_IDS, it.toLongArray())
.build()) .build()))
.build())
} }
} }
override fun sync(immediate: Boolean) { override suspend fun sync(immediate: Boolean) {
val constraints = Constraints.Builder() val constraints = Constraints.Builder()
.setRequiredNetworkType( .setRequiredNetworkType(
if (!immediate && preferences.getBoolean(R.string.p_background_sync_unmetered_only, false)) { if (!immediate && preferences.getBoolean(R.string.p_background_sync_unmetered_only, false)) {
@ -78,24 +89,26 @@ class WorkManagerImpl constructor(
if (!immediate) { if (!immediate) {
builder.setInitialDelay(1, TimeUnit.MINUTES) builder.setInitialDelay(1, TimeUnit.MINUTES)
} }
val request = builder.build() throttle.run {
workManager.beginUniqueWork(TAG_SYNC, ExistingWorkPolicy.REPLACE, request).enqueue() workManager
.beginUniqueWork(TAG_SYNC, ExistingWorkPolicy.REPLACE, builder.build())
.enqueue()
}
} }
override fun reverseGeocode(place: Place) { override suspend fun reverseGeocode(place: Place) {
if (BuildConfig.DEBUG && place.id == 0L) { if (BuildConfig.DEBUG && place.id == 0L) {
throw RuntimeException("Missing id") throw RuntimeException("Missing id")
} }
workManager.enqueue( enqueue(
OneTimeWorkRequest.Builder(ReverseGeocodeWork::class.java) OneTimeWorkRequest.Builder(ReverseGeocodeWork::class.java)
.setInputData(Data.Builder().putLong(ReverseGeocodeWork.PLACE_ID, place.id).build()) .setInputData(Data.Builder().putLong(ReverseGeocodeWork.PLACE_ID, place.id).build())
.setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 1, TimeUnit.MINUTES) .setBackoffCriteria(BackoffPolicy.EXPONENTIAL, 1, TimeUnit.MINUTES)
.setConstraints( .setConstraints(
Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()))
.build())
} }
override fun updateBackgroundSync() = runBlocking { override suspend fun updateBackgroundSync() {
updateBackgroundSync(null, null, null) updateBackgroundSync(null, null, null)
} }
@ -114,8 +127,9 @@ class WorkManagerImpl constructor(
scheduleBackgroundSync(backgroundEnabled && accountsPresent, onlyOnWifi) scheduleBackgroundSync(backgroundEnabled && accountsPresent, onlyOnWifi)
} }
private fun scheduleBackgroundSync(enabled: Boolean, onlyOnUnmetered: Boolean) { private suspend fun scheduleBackgroundSync(enabled: Boolean, onlyOnUnmetered: Boolean) {
Timber.d("background sync enabled: %s, onlyOnUnmetered: %s", enabled, onlyOnUnmetered) Timber.d("background sync enabled: %s, onlyOnUnmetered: %s", enabled, onlyOnUnmetered)
throttle.run {
if (enabled) { if (enabled) {
workManager.enqueueUniquePeriodicWork( workManager.enqueueUniquePeriodicWork(
TAG_BACKGROUND_SYNC, TAG_BACKGROUND_SYNC,
@ -128,10 +142,11 @@ class WorkManagerImpl constructor(
workManager.cancelUniqueWork(TAG_BACKGROUND_SYNC) workManager.cancelUniqueWork(TAG_BACKGROUND_SYNC)
} }
} }
}
override fun scheduleRefresh(time: Long) = enqueueUnique(TAG_REFRESH, RefreshWork::class.java, time) override suspend fun scheduleRefresh(time: Long) = enqueueUnique(TAG_REFRESH, RefreshWork::class.java, time)
override fun scheduleMidnightRefresh() = override suspend fun scheduleMidnightRefresh() =
enqueueUnique(TAG_MIDNIGHT_REFRESH, MidnightRefreshWork::class.java, midnight()) enqueueUnique(TAG_MIDNIGHT_REFRESH, MidnightRefreshWork::class.java, midnight())
override fun scheduleNotification(scheduledTime: Long) { override fun scheduleNotification(scheduledTime: Long) {
@ -149,7 +164,7 @@ class WorkManagerImpl constructor(
} }
} }
override fun scheduleBackup() { override suspend fun scheduleBackup() =
enqueueUnique( enqueueUnique(
TAG_BACKUP, TAG_BACKUP,
BackupWork::class.java, BackupWork::class.java,
@ -157,9 +172,9 @@ class WorkManagerImpl constructor(
.plusDays(1) .plusDays(1)
.millis .millis
.coerceAtMost(midnight())) .coerceAtMost(midnight()))
}
override fun scheduleConfigRefresh() { override suspend fun scheduleConfigRefresh() {
throttle.run {
workManager.enqueueUniquePeriodicWork( workManager.enqueueUniquePeriodicWork(
TAG_REMOTE_CONFIG, TAG_REMOTE_CONFIG,
ExistingPeriodicWorkPolicy.KEEP, ExistingPeriodicWorkPolicy.KEEP,
@ -170,8 +185,9 @@ class WorkManagerImpl constructor(
Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()) Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())
.build()) .build())
} }
}
override fun scheduleDriveUpload(uri: Uri, purge: Boolean) { override suspend fun scheduleDriveUpload(uri: Uri, purge: Boolean) {
if (!preferences.getBoolean(R.string.p_google_drive_backup, false)) { if (!preferences.getBoolean(R.string.p_google_drive_backup, false)) {
return return
} }
@ -181,7 +197,7 @@ class WorkManagerImpl constructor(
if (purge) { if (purge) {
builder.setInitialDelay(Random().nextInt(3600).toLong(), TimeUnit.SECONDS) builder.setInitialDelay(Random().nextInt(3600).toLong(), TimeUnit.SECONDS)
} }
workManager.enqueue(builder.build()) enqueue(builder)
} }
private val networkConstraints: Constraints private val networkConstraints: Constraints
@ -193,14 +209,15 @@ class WorkManagerImpl constructor(
.setRequiredNetworkType(if (unmeteredOnly) NetworkType.UNMETERED else NetworkType.CONNECTED) .setRequiredNetworkType(if (unmeteredOnly) NetworkType.UNMETERED else NetworkType.CONNECTED)
.build() .build()
private fun enqueueUnique(key: String, c: Class<out CoroutineWorker?>, time: Long) { @SuppressLint("EnqueueWork")
private suspend fun enqueueUnique(key: String, c: Class<out CoroutineWorker?>, time: Long) {
val delay = time - DateUtilities.now() val delay = time - DateUtilities.now()
val builder = OneTimeWorkRequest.Builder(c) val builder = OneTimeWorkRequest.Builder(c)
if (delay > 0) { if (delay > 0) {
builder.setInitialDelay(delay, TimeUnit.MILLISECONDS) builder.setInitialDelay(delay, TimeUnit.MILLISECONDS)
} }
Timber.d("$key: ${DateTimeUtils.printTimestamp(time)} (${DateTimeUtils.printDuration(delay)})") Timber.d("$key: ${DateTimeUtils.printTimestamp(time)} (${DateTimeUtils.printDuration(delay)})")
workManager.beginUniqueWork(key, ExistingWorkPolicy.REPLACE, builder.build()).enqueue() enqueue(workManager.beginUniqueWork(key, ExistingWorkPolicy.REPLACE, builder.build()))
} }
override fun cancelNotifications() { override fun cancelNotifications() {

@ -94,13 +94,17 @@ class Synchronization : InjectingPreferenceFragment() {
override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) { override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
if (requestCode == REQUEST_CALDAV_SETTINGS) { if (requestCode == REQUEST_CALDAV_SETTINGS) {
if (resultCode == Activity.RESULT_OK) { if (resultCode == Activity.RESULT_OK) {
lifecycleScope.launch(NonCancellable) {
workManager.sync(true) workManager.sync(true)
workManager.updateBackgroundSync() workManager.updateBackgroundSync()
} }
}
} else if (requestCode == REQUEST_GOOGLE_TASKS) { } else if (requestCode == REQUEST_GOOGLE_TASKS) {
if (resultCode == Activity.RESULT_OK) { if (resultCode == Activity.RESULT_OK) {
lifecycleScope.launch(NonCancellable) {
workManager.sync(true) workManager.sync(true)
workManager.updateBackgroundSync() workManager.updateBackgroundSync()
}
} else if (data != null) { } else if (data != null) {
toaster.longToast(data.getStringExtra(GtasksLoginActivity.EXTRA_ERROR)) toaster.longToast(data.getStringExtra(GtasksLoginActivity.EXTRA_ERROR))
} }

@ -28,7 +28,7 @@ class RefreshScheduler @Inject internal constructor(
} }
@Synchronized @Synchronized
fun scheduleRefresh(task: Task) { suspend fun scheduleRefresh(task: Task) {
if (task.isCompleted if (task.isCompleted
&& preferences.getBoolean(R.string.p_temporarily_show_completed_tasks, false)) { && preferences.getBoolean(R.string.p_temporarily_show_completed_tasks, false)) {
scheduleRefresh(task.completionDate + DateUtilities.ONE_MINUTE) scheduleRefresh(task.completionDate + DateUtilities.ONE_MINUTE)
@ -41,7 +41,7 @@ class RefreshScheduler @Inject internal constructor(
} }
@Synchronized @Synchronized
fun scheduleNext() { suspend fun scheduleNext() {
val lapsed = jobs.headSet(DateTimeUtils.currentTimeMillis() + 1).toImmutableList() val lapsed = jobs.headSet(DateTimeUtils.currentTimeMillis() + 1).toImmutableList()
jobs.removeAll(lapsed) jobs.removeAll(lapsed)
if (!jobs.isEmpty()) { if (!jobs.isEmpty()) {
@ -49,7 +49,7 @@ class RefreshScheduler @Inject internal constructor(
} }
} }
private fun scheduleRefresh(timestamp: Long) { private suspend fun scheduleRefresh(timestamp: Long) {
val now = DateTimeUtils.currentTimeMillis() val now = DateTimeUtils.currentTimeMillis()
if (now < timestamp) { if (now < timestamp) {
val upcoming = jobs.tailSet(now) val upcoming = jobs.tailSet(now)

Loading…
Cancel
Save