Initiate work requests from background thread

pull/1061/head
Alex Baker 5 years ago
parent 6ee6e96a7f
commit e9ab544b34

@ -7,34 +7,35 @@ import org.tasks.data.Place
interface WorkManager { interface WorkManager {
suspend fun afterComplete(task: Task) fun afterComplete(task: Task)
suspend fun cleanup(ids: Iterable<Long>) fun cleanup(ids: Iterable<Long>)
suspend fun googleTaskSync(immediate: Boolean) fun googleTaskSync(immediate: Boolean)
suspend fun caldavSync(immediate: Boolean) fun caldavSync(immediate: Boolean)
suspend fun eteSync(immediate: Boolean) fun eteSync(immediate: Boolean)
suspend fun reverseGeocode(place: Place)
suspend fun updateBackgroundSync() fun reverseGeocode(place: Place)
suspend fun updateBackgroundSync( fun updateBackgroundSync()
fun updateBackgroundSync(
forceBackgroundEnabled: Boolean?, forceOnlyOnUnmetered: Boolean?) forceBackgroundEnabled: Boolean?, forceOnlyOnUnmetered: Boolean?)
suspend fun scheduleRefresh(time: Long) fun scheduleRefresh(time: Long)
suspend fun scheduleMidnightRefresh() fun scheduleMidnightRefresh()
fun scheduleNotification(scheduledTime: Long) fun scheduleNotification(scheduledTime: Long)
suspend fun scheduleBackup() fun scheduleBackup()
suspend fun scheduleConfigRefresh() fun scheduleConfigRefresh()
suspend fun scheduleDriveUpload(uri: Uri, purge: Boolean) fun scheduleDriveUpload(uri: Uri, purge: Boolean)
fun cancelNotifications() fun cancelNotifications()

@ -19,7 +19,6 @@ import org.tasks.data.GoogleTaskListDao
import org.tasks.data.Place import org.tasks.data.Place
import org.tasks.date.DateTimeUtils.midnight import org.tasks.date.DateTimeUtils.midnight
import org.tasks.date.DateTimeUtils.newDateTime import org.tasks.date.DateTimeUtils.newDateTime
import org.tasks.db.SuspendDbUtils.eachChunk
import org.tasks.jobs.WorkManager.Companion.MAX_CLEANUP_LENGTH import org.tasks.jobs.WorkManager.Companion.MAX_CLEANUP_LENGTH
import org.tasks.jobs.WorkManager.Companion.REMOTE_CONFIG_INTERVAL_HOURS import org.tasks.jobs.WorkManager.Companion.REMOTE_CONFIG_INTERVAL_HOURS
import org.tasks.jobs.WorkManager.Companion.TAG_BACKGROUND_SYNC_CALDAV import org.tasks.jobs.WorkManager.Companion.TAG_BACKGROUND_SYNC_CALDAV
@ -49,19 +48,7 @@ class WorkManagerImpl constructor(
private val alarmManager: AlarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager private val alarmManager: AlarmManager = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager
private val workManager = androidx.work.WorkManager.getInstance(context) private val workManager = androidx.work.WorkManager.getInstance(context)
private suspend fun enqueue(builder: WorkRequest.Builder<*, *>) { override fun afterComplete(task: Task) {
throttle.run {
workManager.enqueue(builder.build())
}
}
private suspend fun enqueue(continuation: WorkContinuation) {
throttle.run {
continuation.enqueue()
}
}
override suspend fun afterComplete(task: Task) {
enqueue( enqueue(
OneTimeWorkRequest.Builder(AfterSaveWork::class.java) OneTimeWorkRequest.Builder(AfterSaveWork::class.java)
.setInputData(Data.Builder() .setInputData(Data.Builder()
@ -69,8 +56,8 @@ class WorkManagerImpl constructor(
.build())) .build()))
} }
override suspend fun cleanup(ids: Iterable<Long>) { override fun cleanup(ids: Iterable<Long>) {
ids.eachChunk(MAX_CLEANUP_LENGTH) { ids.chunked(MAX_CLEANUP_LENGTH) {
enqueue( enqueue(
OneTimeWorkRequest.Builder(CleanupWork::class.java) OneTimeWorkRequest.Builder(CleanupWork::class.java)
.setInputData( .setInputData(
@ -80,36 +67,36 @@ class WorkManagerImpl constructor(
} }
} }
override suspend fun googleTaskSync(immediate: Boolean) = override fun googleTaskSync(immediate: Boolean) =
sync(immediate, TAG_SYNC_GOOGLE_TASKS, SyncGoogleTasksWork::class.java) sync(immediate, TAG_SYNC_GOOGLE_TASKS, SyncGoogleTasksWork::class.java)
override suspend fun caldavSync(immediate: Boolean) = override fun caldavSync(immediate: Boolean) =
sync(immediate, TAG_SYNC_CALDAV, SyncCaldavWork::class.java) sync(immediate, TAG_SYNC_CALDAV, SyncCaldavWork::class.java)
override suspend fun eteSync(immediate: Boolean) = override fun eteSync(immediate: Boolean) =
sync(immediate, TAG_SYNC_ETESYNC, SyncEteSyncWork::class.java) sync(immediate, TAG_SYNC_ETESYNC, SyncEteSyncWork::class.java)
private suspend fun sync(immediate: Boolean, tag: String, c: Class<out SyncWork>) { @SuppressLint("EnqueueWork")
val constraints = Constraints.Builder() private fun sync(immediate: Boolean, tag: String, c: Class<out SyncWork>, requireNetwork: Boolean = true) {
Timber.d("sync(immediate = $immediate, $tag, $c, requireNetwork = $requireNetwork)")
val builder = OneTimeWorkRequest.Builder(c)
if (requireNetwork) {
builder.setConstraints(Constraints.Builder()
.setRequiredNetworkType( .setRequiredNetworkType(
if (!immediate && preferences.getBoolean(R.string.p_background_sync_unmetered_only, false)) { if (!immediate && preferences.getBoolean(R.string.p_background_sync_unmetered_only, false)) {
NetworkType.UNMETERED NetworkType.UNMETERED
} else { } else {
NetworkType.CONNECTED NetworkType.CONNECTED
}) })
.build() .build())
val builder = OneTimeWorkRequest.Builder(c).setConstraints(constraints) }
if (!immediate) { if (!immediate) {
builder.setInitialDelay(1, TimeUnit.MINUTES) builder.setInitialDelay(1, TimeUnit.MINUTES)
} }
throttle.run { enqueue(workManager.beginUniqueWork(tag, ExistingWorkPolicy.REPLACE, builder.build()))
workManager
.beginUniqueWork(tag, ExistingWorkPolicy.REPLACE, builder.build())
.enqueue()
}
} }
override suspend fun reverseGeocode(place: Place) { override fun reverseGeocode(place: Place) {
if (BuildConfig.DEBUG && place.id == 0L) { if (BuildConfig.DEBUG && place.id == 0L) {
throw RuntimeException("Missing id") throw RuntimeException("Missing id")
} }
@ -120,39 +107,43 @@ class WorkManagerImpl constructor(
Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build())) Constraints.Builder().setRequiredNetworkType(NetworkType.CONNECTED).build()))
} }
override suspend fun updateBackgroundSync() { override fun updateBackgroundSync() {
updateBackgroundSync(null, null) updateBackgroundSync(null, null)
} }
@SuppressLint("CheckResult") @SuppressLint("CheckResult")
override suspend fun updateBackgroundSync( override fun updateBackgroundSync(
forceBackgroundEnabled: Boolean?, forceOnlyOnUnmetered: Boolean?) { forceBackgroundEnabled: Boolean?, forceOnlyOnUnmetered: Boolean?) {
val enabled = forceBackgroundEnabled val enabled = forceBackgroundEnabled
?: preferences.getBoolean(R.string.p_background_sync, true) ?: preferences.getBoolean(R.string.p_background_sync, true)
val unmetered = forceOnlyOnUnmetered val unmetered = forceOnlyOnUnmetered
?: preferences.getBoolean(R.string.p_background_sync_unmetered_only, false) ?: preferences.getBoolean(R.string.p_background_sync_unmetered_only, false)
throttle.run {
scheduleBackgroundSync( scheduleBackgroundSync(
TAG_BACKGROUND_SYNC_GOOGLE_TASKS, TAG_BACKGROUND_SYNC_GOOGLE_TASKS,
SyncGoogleTasksWork::class.java, SyncGoogleTasksWork::class.java,
enabled && googleTaskListDao.accountCount() > 0, enabled && googleTaskListDao.accountCount() > 0,
unmetered) unmetered)
}
throttle.run {
scheduleBackgroundSync( scheduleBackgroundSync(
TAG_BACKGROUND_SYNC_CALDAV, TAG_BACKGROUND_SYNC_CALDAV,
SyncCaldavWork::class.java, SyncCaldavWork::class.java,
enabled && caldavDao.getAccounts(TYPE_CALDAV).isNotEmpty(), enabled && caldavDao.getAccounts(TYPE_CALDAV).isNotEmpty(),
unmetered) unmetered)
}
throttle.run {
scheduleBackgroundSync( scheduleBackgroundSync(
TAG_BACKGROUND_SYNC_ETESYNC, TAG_BACKGROUND_SYNC_ETESYNC,
SyncEteSyncWork::class.java, SyncEteSyncWork::class.java,
enabled && caldavDao.getAccounts(TYPE_ETESYNC).isNotEmpty(), enabled && caldavDao.getAccounts(TYPE_ETESYNC).isNotEmpty(),
unmetered) unmetered)
} }
}
private suspend fun scheduleBackgroundSync( private fun scheduleBackgroundSync(
tag: String, c: Class<out SyncWork>, enabled: Boolean, unmetered: Boolean? = null) { tag: String, c: Class<out SyncWork>, enabled: Boolean, unmetered: Boolean? = null) {
Timber.d("scheduleBackgroundSync($tag, $c, enabled = $enabled, unmetered = $unmetered)") Timber.d("scheduleBackgroundSync($tag, $c, enabled = $enabled, unmetered = $unmetered)")
throttle.run {
if (enabled) { if (enabled) {
val builder = PeriodicWorkRequest.Builder(c, 1, TimeUnit.HOURS) val builder = PeriodicWorkRequest.Builder(c, 1, TimeUnit.HOURS)
unmetered?.let { builder.setConstraints(getNetworkConstraints(it)) } unmetered?.let { builder.setConstraints(getNetworkConstraints(it)) }
@ -162,11 +153,10 @@ class WorkManagerImpl constructor(
workManager.cancelUniqueWork(tag) workManager.cancelUniqueWork(tag)
} }
} }
}
override suspend fun scheduleRefresh(time: Long) = enqueueUnique(TAG_REFRESH, RefreshWork::class.java, time) override fun scheduleRefresh(time: Long) = enqueueUnique(TAG_REFRESH, RefreshWork::class.java, time)
override suspend fun scheduleMidnightRefresh() = override fun scheduleMidnightRefresh() =
enqueueUnique(TAG_MIDNIGHT_REFRESH, MidnightRefreshWork::class.java, midnight()) enqueueUnique(TAG_MIDNIGHT_REFRESH, MidnightRefreshWork::class.java, midnight())
override fun scheduleNotification(scheduledTime: Long) { override fun scheduleNotification(scheduledTime: Long) {
@ -184,7 +174,7 @@ class WorkManagerImpl constructor(
} }
} }
override suspend fun scheduleBackup() = override fun scheduleBackup() =
enqueueUnique( enqueueUnique(
TAG_BACKUP, TAG_BACKUP,
BackupWork::class.java, BackupWork::class.java,
@ -193,7 +183,7 @@ class WorkManagerImpl constructor(
.millis .millis
.coerceAtMost(midnight())) .coerceAtMost(midnight()))
override suspend fun scheduleConfigRefresh() { override fun scheduleConfigRefresh() {
throttle.run { throttle.run {
workManager.enqueueUniquePeriodicWork( workManager.enqueueUniquePeriodicWork(
TAG_REMOTE_CONFIG, TAG_REMOTE_CONFIG,
@ -206,7 +196,7 @@ class WorkManagerImpl constructor(
} }
} }
override suspend fun scheduleDriveUpload(uri: Uri, purge: Boolean) { override fun scheduleDriveUpload(uri: Uri, purge: Boolean) {
if (!preferences.getBoolean(R.string.p_google_drive_backup, false)) { if (!preferences.getBoolean(R.string.p_google_drive_backup, false)) {
return return
} }
@ -228,8 +218,12 @@ class WorkManagerImpl constructor(
.setRequiredNetworkType(if (unmeteredOnly) NetworkType.UNMETERED else NetworkType.CONNECTED) .setRequiredNetworkType(if (unmeteredOnly) NetworkType.UNMETERED else NetworkType.CONNECTED)
.build() .build()
override fun cancelNotifications() {
alarmManager.cancel(notificationPendingIntent)
}
@SuppressLint("EnqueueWork") @SuppressLint("EnqueueWork")
private suspend fun enqueueUnique(key: String, c: Class<out Worker?>, time: Long) { private fun enqueueUnique(key: String, c: Class<out Worker?>, time: Long) {
val delay = time - DateUtilities.now() val delay = time - DateUtilities.now()
val builder = OneTimeWorkRequest.Builder(c) val builder = OneTimeWorkRequest.Builder(c)
if (delay > 0) { if (delay > 0) {
@ -239,9 +233,16 @@ class WorkManagerImpl constructor(
enqueue(workManager.beginUniqueWork(key, ExistingWorkPolicy.REPLACE, builder.build())) enqueue(workManager.beginUniqueWork(key, ExistingWorkPolicy.REPLACE, builder.build()))
} }
override fun cancelNotifications() { private fun enqueue(builder: WorkRequest.Builder<*, *>) {
Timber.d("cancelNotifications") throttle.run {
alarmManager.cancel(notificationPendingIntent) workManager.enqueue(builder.build())
}
}
private fun enqueue(continuation: WorkContinuation) {
throttle.run {
continuation.enqueue()
}
} }
private val notificationIntent: Intent private val notificationIntent: Intent

@ -1,25 +1,32 @@
package org.tasks.notifications package org.tasks.notifications
import kotlinx.coroutines.delay import kotlinx.coroutines.*
import org.tasks.time.DateTimeUtils.currentTimeMillis import org.tasks.time.DateTimeUtils.currentTimeMillis
import timber.log.Timber import timber.log.Timber
import java.util.concurrent.Executors.newSingleThreadExecutor
internal class Throttle constructor( internal class Throttle constructor(
ratePerPeriod: Int, ratePerPeriod: Int,
private val periodMillis: Long = 1000, private val periodMillis: Long = 1000,
private val tag: String = "", private val tag: String = "",
private val scope: CoroutineScope =
CoroutineScope(newSingleThreadExecutor().asCoroutineDispatcher() + SupervisorJob()),
private val sleeper: suspend (Long) -> Unit = { delay(it) }) { private val sleeper: suspend (Long) -> Unit = { delay(it) }) {
private val throttle: LongArray = LongArray(ratePerPeriod) private val throttle: LongArray = LongArray(ratePerPeriod)
private var oldest = 0 private var oldest = 0
@Synchronized @Synchronized
suspend fun run(runnable: suspend () -> Unit) { fun run(runnable: suspend () -> Unit): Job = scope.launch {
val sleep = throttle[oldest] - (currentTimeMillis() - periodMillis) val sleep = throttle[oldest] - (currentTimeMillis() - periodMillis)
if (sleep > 0) { if (sleep > 0) {
Timber.d("$tag: Throttled for ${sleep}ms") Timber.v("$tag: Throttled for ${sleep}ms")
sleeper.invoke(sleep) sleeper.invoke(sleep)
} }
try {
runnable.invoke() runnable.invoke()
} catch (e: Exception) {
Timber.e(e)
}
throttle[oldest] = currentTimeMillis() throttle[oldest] = currentTimeMillis()
oldest = (oldest + 1) % throttle.size oldest = (oldest + 1) % throttle.size
} }

@ -17,11 +17,11 @@ class ThrottleTest {
@Before @Before
fun setUp() { fun setUp() {
sleep = ArrayList() sleep = ArrayList()
throttle = Throttle(3) { sleep.add(it) }
} }
@Test @Test
fun dontThrottle() = runBlockingTest { fun dontThrottle() = runBlockingTest {
throttle = Throttle(3, scope = this) { sleep.add(it) }
val now = DateTimeUtils.currentTimeMillis() val now = DateTimeUtils.currentTimeMillis()
runAt(now) runAt(now)
runAt(now) runAt(now)
@ -32,6 +32,7 @@ class ThrottleTest {
@Test @Test
fun throttleForOneMillisecond() = runBlockingTest { fun throttleForOneMillisecond() = runBlockingTest {
throttle = Throttle(3, scope = this) { sleep.add(it) }
val now = DateTimeUtils.currentTimeMillis() val now = DateTimeUtils.currentTimeMillis()
runAt(now) runAt(now)
runAt(now) runAt(now)
@ -42,6 +43,7 @@ class ThrottleTest {
@Test @Test
fun throttleForOneSecond() = runBlockingTest { fun throttleForOneSecond() = runBlockingTest {
throttle = Throttle(3, scope = this) { sleep.add(it) }
val now = DateTimeUtils.currentTimeMillis() val now = DateTimeUtils.currentTimeMillis()
runAt(now) runAt(now)
runAt(now) runAt(now)
@ -52,6 +54,7 @@ class ThrottleTest {
@Test @Test
fun throttleMultiple() = runBlockingTest { fun throttleMultiple() = runBlockingTest {
throttle = Throttle(3, scope = this) { sleep.add(it) }
val now = DateTimeUtils.currentTimeMillis() val now = DateTimeUtils.currentTimeMillis()
runAt(now) runAt(now)
runAt(now + 200) runAt(now + 200)

Loading…
Cancel
Save