diff --git a/astrid/plugin-src/com/todoroo/astrid/actfm/sync/ActFmSyncV2Provider.java b/astrid/plugin-src/com/todoroo/astrid/actfm/sync/ActFmSyncV2Provider.java index c3acb0747..0ff99ad7c 100644 --- a/astrid/plugin-src/com/todoroo/astrid/actfm/sync/ActFmSyncV2Provider.java +++ b/astrid/plugin-src/com/todoroo/astrid/actfm/sync/ActFmSyncV2Provider.java @@ -4,6 +4,9 @@ package com.todoroo.astrid.actfm.sync; import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.json.JSONException; @@ -17,9 +20,11 @@ import com.todoroo.andlib.sql.Criterion; import com.todoroo.andlib.sql.Query; import com.todoroo.andlib.utility.Preferences; import com.todoroo.astrid.dao.TaskDao.TaskCriteria; +import com.todoroo.astrid.data.RemoteModel; import com.todoroo.astrid.data.TagData; import com.todoroo.astrid.data.Task; import com.todoroo.astrid.service.AstridDependencyInjector; +import com.todoroo.astrid.service.TagDataService; import com.todoroo.astrid.service.TaskService; import com.todoroo.astrid.sync.SyncResultCallback; import com.todoroo.astrid.sync.SyncV2Provider; @@ -31,12 +36,43 @@ import com.todoroo.astrid.tags.TagService; */ public class ActFmSyncV2Provider extends SyncV2Provider { + private static final int NUM_THREADS = 20; + @Autowired ActFmPreferenceService actFmPreferenceService; @Autowired ActFmSyncService actFmSyncService; @Autowired TaskService taskService; + @Autowired TagDataService tagDataService; + + private final PushQueuedArgs taskPusher = new PushQueuedArgs() { + @Override + public Task getRemoteModelInstance(TodorooCursor cursor) { + return new Task(cursor); + } + + @Override + public void pushRemoteModel(Task model) { + actFmSyncService.pushTaskOnSave(model, model.getMergedValues()); + } + + }; + + private final PushQueuedArgs tagPusher = new PushQueuedArgs() { + + @Override + public void pushRemoteModel(TagData model) { + actFmSyncService.pushTagDataOnSave(model, model.getMergedValues()); + } + + @Override + public TagData getRemoteModelInstance( + TodorooCursor cursor) { + return new TagData(cursor); + } + }; + static { AstridDependencyInjector.initialize(); } @@ -91,6 +127,7 @@ public class ActFmSyncV2Provider extends SyncV2Provider { public void run() { int time = Preferences.getInt(LAST_TAG_FETCH_TIME, 0); try { + pushQueuedTags(callback, finisher, time); time = actFmSyncService.fetchTags(time); Preferences.setInt(LAST_TAG_FETCH_TIME, time); } catch (JSONException e) { @@ -114,7 +151,7 @@ public class ActFmSyncV2Provider extends SyncV2Provider { actFmSyncService.fetchActiveTasks(manual, handler, new Runnable() { @Override public void run() { - pushQueued(callback, finisher); + pushQueuedTasks(callback, finisher); callback.incrementProgress(30); if(finisher.decrementAndGet() == 0) { @@ -125,27 +162,26 @@ public class ActFmSyncV2Provider extends SyncV2Provider { }); } - private void pushQueued(final SyncResultCallback callback, - final AtomicInteger finisher) { - TodorooCursor cursor = taskService.query(Query.select(Task.PROPERTIES). - where(Criterion.or( - Criterion.and(TaskCriteria.isActive(), - Task.REMOTE_ID.eq(0)), - Criterion.and(Task.REMOTE_ID.gt(0), - Task.MODIFICATION_DATE.gt(Task.LAST_SYNC))))); + private static interface PushQueuedArgs { + public T getRemoteModelInstance(TodorooCursor cursor); + public void pushRemoteModel(T model); + } + private void pushQueued(final SyncResultCallback callback, final AtomicInteger finisher, + TodorooCursor cursor, boolean awaitTermination, final PushQueuedArgs pusher) { try { callback.incrementMax(cursor.getCount() * 20); finisher.addAndGet(cursor.getCount()); + ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); for(int i = 0; i < cursor.getCount(); i++) { cursor.moveToNext(); - final Task task = new Task(cursor); + final T model = pusher.getRemoteModelInstance(cursor); - new Thread(new Runnable() { + executor.submit(new Runnable() { public void run() { try { - actFmSyncService.pushTaskOnSave(task, task.getMergedValues()); + pusher.pushRemoteModel(model); } finally { callback.incrementProgress(20); if(finisher.decrementAndGet() == 0) { @@ -154,13 +190,44 @@ public class ActFmSyncV2Provider extends SyncV2Provider { } } } - }).start(); + }); } + executor.shutdown(); + if (awaitTermination) + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } finally { cursor.close(); } } + private void pushQueuedTasks(final SyncResultCallback callback, + final AtomicInteger finisher) { + TodorooCursor taskCursor = taskService.query(Query.select(Task.PROPERTIES). + where(Criterion.or( + Criterion.and(TaskCriteria.isActive(), + Task.REMOTE_ID.eq(0)), + Criterion.and(Task.REMOTE_ID.gt(0), + Task.MODIFICATION_DATE.gt(Task.LAST_SYNC))))); + + pushQueued(callback, finisher, taskCursor, false, taskPusher); + } + + private void pushQueuedTags(final SyncResultCallback callback, + final AtomicInteger finisher, int lastTagSyncTime) { + TodorooCursor tagDataCursor = tagDataService.query(Query.select(TagData.PROPERTIES) + .where(Criterion.or( + TagData.REMOTE_ID.eq(0), + Criterion.and(TagData.REMOTE_ID.gt(0), + TagData.MODIFICATION_DATE.gt(lastTagSyncTime))))); + + pushQueued(callback, finisher, tagDataCursor, true, tagPusher); + + } + // --- synchronize list @Override @@ -173,9 +240,6 @@ public class ActFmSyncV2Provider extends SyncV2Provider { TagData tagData = (TagData) list; final boolean noRemoteId = tagData.getValue(TagData.REMOTE_ID) == 0; - if(noRemoteId && !manual) - return; - callback.started(); callback.incrementMax(100);