Implement timeForBackgroundSync(), fixed some synchronization bugs on sync thread, cleaned up a bunch of dead code from old sync service

pull/14/head
Sam Bosley 12 years ago
parent 9f8937b922
commit 7d33251a26

@ -44,7 +44,6 @@ import com.todoroo.astrid.data.Task;
import com.todoroo.astrid.data.TaskOutstanding;
import com.todoroo.astrid.data.User;
import com.todoroo.astrid.data.UserActivity;
import com.todoroo.astrid.data.UserActivityOutstanding;
public class ActFmSyncThread {
@ -83,6 +82,8 @@ public class ActFmSyncThread {
private boolean syncMigration = false;
private boolean isTimeForBackgroundSync = false;
public static enum ModelType {
TYPE_TASK,
TYPE_TAG,
@ -143,7 +144,7 @@ public class ActFmSyncThread {
}
}
public void enqueueMessage(ClientToServerMessage<?> message, Runnable callback) {
public synchronized void enqueueMessage(ClientToServerMessage<?> message, Runnable callback) {
if (!pendingMessages.contains(message)) {
pendingMessages.add(message);
if (callback != null)
@ -154,10 +155,18 @@ public class ActFmSyncThread {
}
}
public synchronized void setTimeForBackgroundSync(boolean isTimeForBackgroundSync) {
this.isTimeForBackgroundSync = isTimeForBackgroundSync;
if (isTimeForBackgroundSync)
synchronized (monitor) {
monitor.notifyAll();
}
}
@SuppressWarnings("nls")
private void sync() {
try {
int batchSize = 1;
int batchSize = 3;
List<ClientToServerMessage<?>> messageBatch = new LinkedList<ClientToServerMessage<?>>();
while(true) {
synchronized(monitor) {
@ -174,21 +183,22 @@ public class ActFmSyncThread {
}
}
// Stuff in the document
boolean refreshAfterBatch = false;
if (timeForBackgroundSync()) {
enqueueMessage(BriefMe.instantiateBriefMeForClass(Task.class, NameMaps.PUSHED_AT_TASKS), null);
enqueueMessage(BriefMe.instantiateBriefMeForClass(TagData.class, NameMaps.PUSHED_AT_TAGS), null);
enqueueMessage(BriefMe.instantiateBriefMeForClass(User.class, NameMaps.PUSHED_AT_USERS), null);
repopulateQueueFromOutstandingTables();
refreshAfterBatch = true;
setTimeForBackgroundSync(false);
}
while (messageBatch.size() < batchSize && !pendingMessages.isEmpty()) {
ClientToServerMessage<?> message = pendingMessages.remove(0);
if (message != null)
messageBatch.add(message);
}
boolean refreshAfterBatch = false;
if (messageBatch.isEmpty() && timeForBackgroundSync()) {
messageBatch.add(BriefMe.instantiateBriefMeForClass(Task.class, NameMaps.PUSHED_AT_TASKS));
messageBatch.add(BriefMe.instantiateBriefMeForClass(TagData.class, NameMaps.PUSHED_AT_TAGS));
messageBatch.add(BriefMe.instantiateBriefMeForClass(User.class, NameMaps.PUSHED_AT_USERS));
refreshAfterBatch = true;
}
if (!messageBatch.isEmpty() && checkForToken()) {
JSONArray payload = new JSONArray();
for (ClientToServerMessage<?> message : messageBatch) {
@ -261,11 +271,10 @@ public class ActFmSyncThread {
private void replayOutstandingChanges(boolean afterErrors) {
new ReplayOutstandingEntries<Task, TaskOutstanding>(Task.class, NameMaps.TABLE_ID_TASKS, taskDao, taskOutstandingDao, this, afterErrors).execute();
new ReplayOutstandingEntries<TagData, TagOutstanding>(TagData.class, NameMaps.TABLE_ID_TAGS, tagDataDao, tagOutstandingDao, this, afterErrors).execute();
new ReplayOutstandingEntries<UserActivity, UserActivityOutstanding>(UserActivity.class, NameMaps.TABLE_ID_USER_ACTIVITY, userActivityDao, userActivityOutstandingDao, this, afterErrors).execute();
}
private boolean timeForBackgroundSync() {
return false; // TODO: replace false with a real background sync condition
return isTimeForBackgroundSync;
}
private void repopulateQueueFromOutstandingTables() {
@ -280,9 +289,7 @@ public class ActFmSyncThread {
for (outstanding.moveToFirst(); !outstanding.isAfterLast(); outstanding.moveToNext()) {
Long id = outstanding.get(OutstandingEntry.ENTITY_ID_PROPERTY);
ChangesHappened<T, OE> ch = new ChangesHappened<T, OE>(id, modelClass, modelDao, oustandingDao);
if (!pendingMessages.contains(ch)) {
pendingMessages.add(ch);
}
enqueueMessage(ch, null);
}
} finally {
outstanding.close();

@ -6,46 +6,25 @@
package com.todoroo.astrid.actfm.sync;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.json.JSONException;
import org.json.JSONObject;
import android.content.Intent;
import com.timsu.astrid.GCMIntentService;
import com.timsu.astrid.R;
import com.todoroo.andlib.data.AbstractModel;
import com.todoroo.andlib.data.TodorooCursor;
import com.todoroo.andlib.service.Autowired;
import com.todoroo.andlib.service.ContextManager;
import com.todoroo.andlib.sql.Criterion;
import com.todoroo.andlib.sql.Query;
import com.todoroo.andlib.utility.Preferences;
import com.todoroo.astrid.actfm.sync.messages.BriefMe;
import com.todoroo.astrid.actfm.sync.messages.NameMaps;
import com.todoroo.astrid.api.AstridApiConstants;
import com.todoroo.astrid.billing.BillingConstants;
import com.todoroo.astrid.dao.Database;
import com.todoroo.astrid.dao.MetadataDao.MetadataCriteria;
import com.todoroo.astrid.dao.UserDao;
import com.todoroo.astrid.data.Metadata;
import com.todoroo.astrid.data.TagData;
import com.todoroo.astrid.data.Task;
import com.todoroo.astrid.data.User;
import com.todoroo.astrid.files.FileMetadata;
import com.todoroo.astrid.service.AstridDependencyInjector;
import com.todoroo.astrid.service.MetadataService;
import com.todoroo.astrid.service.TagDataService;
import com.todoroo.astrid.service.TaskService;
import com.todoroo.astrid.subtasks.SubtasksUpdater;
import com.todoroo.astrid.sync.SyncResultCallback;
import com.todoroo.astrid.sync.SyncV2Provider;
import com.todoroo.astrid.tags.TagService;
import com.todoroo.astrid.tags.TaskToTagMetadata;
/**
* Exposes sync action
@ -53,8 +32,6 @@ import com.todoroo.astrid.tags.TaskToTagMetadata;
*/
public class ActFmSyncV2Provider extends SyncV2Provider {
private static final int NUM_THREADS = 20;
@Autowired ActFmPreferenceService actFmPreferenceService;
@Autowired ActFmSyncService actFmSyncService;
@ -69,68 +46,6 @@ public class ActFmSyncV2Provider extends SyncV2Provider {
@Autowired Database database;
private final PushQueuedArgs<Task> taskPusher = new PushQueuedArgs<Task>() {
@Override
public Task getRemoteModelInstance(TodorooCursor<Task> cursor) {
return new Task(cursor);
}
@Override
public void pushRemoteModel(Task model) {
// long userId = model.getValue(Task.USER_ID);
// if (userId != Task.USER_ID_SELF && userId != ActFmPreferenceService.userId())
// model.putTransitory(TaskService.TRANS_ASSIGNED, true);
// actFmSyncService.pushTaskOnSave(model, model.getMergedValues());
}
};
private final PushQueuedArgs<TagData> tagPusher = new PushQueuedArgs<TagData>() {
@Override
public void pushRemoteModel(TagData model) {
actFmSyncService.pushTagDataOnSave(model, model.getMergedValues());
}
@Override
public TagData getRemoteModelInstance(
TodorooCursor<TagData> cursor) {
return new TagData(cursor);
}
};
private final PushQueuedArgs<User> userPusher = new PushQueuedArgs<User>() {
@Override
public User getRemoteModelInstance(TodorooCursor<User> cursor) {
return new User(cursor);
}
@Override
public void pushRemoteModel(User model) {
// actFmSyncService.pushUser(model);
}
};
private final PushQueuedArgs<Metadata> filesPusher = new PushQueuedArgs<Metadata>() {
@Override
public void pushRemoteModel(Metadata model) {
// long taskId = model.getValue(Metadata.TASK);
// Task localTask = taskService.fetchById(taskId, Task.REMOTE_ID);
// long remoteTaskId = localTask.getValue(Task.REMOTE_ID);
//
// if (model.getValue(FileMetadata.DELETION_DATE) > 0)
// actFmSyncService.deleteAttachment(model);
// else if (remoteTaskId > 0)
// actFmSyncService.pushAttachment(remoteTaskId, model);
};
public Metadata getRemoteModelInstance(TodorooCursor<Metadata> cursor) {
return new Metadata(cursor);
}
};
static {
AstridDependencyInjector.initialize();
}
@ -158,12 +73,8 @@ public class ActFmSyncV2Provider extends SyncV2Provider {
return actFmPreferenceService.isLoggedIn();
}
private static final String LAST_TAG_FETCH_TIME = "actfm_lastTag"; //$NON-NLS-1$
private static final String LAST_FEATURED_TAG_FETCH_TIME = "actfm_last_featuredTag"; //$NON-NLS-1$
private static final String LAST_USERS_FETCH_TIME = "actfm_lastUsers"; //$NON-NLS-1$
// --- synchronize active tasks
@Override
@ -177,31 +88,12 @@ public class ActFmSyncV2Provider extends SyncV2Provider {
final AtomicInteger finisher = new AtomicInteger(1);
actFmPreferenceService.recordSyncStart();
updateUserStatus();
Runnable refreshCallback = new Runnable() {
@Override
public void run() {
Intent refresh = new Intent(AstridApiConstants.BROADCAST_EVENT_REFRESH);
ContextManager.getContext().sendBroadcast(refresh);
}
};
ActFmSyncThread.getInstance().enqueueMessage(BriefMe.instantiateBriefMeForClass(Task.class, NameMaps.PUSHED_AT_TASKS), null);
ActFmSyncThread.getInstance().enqueueMessage(BriefMe.instantiateBriefMeForClass(TagData.class, NameMaps.PUSHED_AT_TAGS), null);
ActFmSyncThread.getInstance().enqueueMessage(BriefMe.instantiateBriefMeForClass(User.class, NameMaps.PUSHED_AT_USERS), refreshCallback);
ActFmSyncThread.getInstance().setTimeForBackgroundSync(true);
// startUsersSync(callback, finisher);
//
// startTagFetcher(callback, finisher);
//
// startUpdatesFetcher(manual, callback, finisher);
//
startFeaturedListFetcher(callback, finisher);
// actFmSyncService.waitUntilEmpty();
// startTaskFetcher(manual, callback, finisher);
callback.incrementProgress(50);
}
}).start();
@ -241,70 +133,6 @@ public class ActFmSyncV2Provider extends SyncV2Provider {
}
}
/** fetch changes to users/friends */
private void startUsersSync(final SyncResultCallback callback,
final AtomicInteger finisher) {
new Thread(new Runnable() {
@Override
public void run() {
// int time = Preferences.getInt(LAST_USERS_FETCH_TIME, 0);
// try {
// pushQueuedUsers(callback, finisher);
// time = actFmSyncService.fetchUsers();
// Preferences.setInt(LAST_USERS_FETCH_TIME, time);
// } catch (JSONException e) {
// handler.handleException("actfm-sync", e, e.toString()); //$NON-NLS-1$
// } catch (IOException e) {
// handler.handleException("actfm-sync", e, e.toString()); //$NON-NLS-1$
// } finally {
// callback.incrementProgress(20);
// if(finisher.decrementAndGet() == 0) {
// finishSync(callback);
// }
// }
}
}).start();
}
/** fetch changes to tags */
private void startTagFetcher(final SyncResultCallback callback,
final AtomicInteger finisher) {
new Thread(new Runnable() {
@Override
public void run() {
int time = Preferences.getInt(LAST_TAG_FETCH_TIME, 0);
try {
pushQueuedTags(callback, finisher, time);
time = actFmSyncService.fetchTags(time);
Preferences.setInt(LAST_TAG_FETCH_TIME, time);
} catch (JSONException e) {
handler.handleException("actfm-sync", e, e.toString()); //$NON-NLS-1$
} catch (IOException e) {
handler.handleException("actfm-sync", e, e.toString()); //$NON-NLS-1$
} finally {
callback.incrementProgress(20);
if(finisher.decrementAndGet() == 0) {
finishSync(callback);
}
}
}
}).start();
}
/** fetch changes to personal updates and push unpushed updates */
private void startUpdatesFetcher(final boolean manual, final SyncResultCallback callback,
final AtomicInteger finisher) {
// actFmSyncService.fetchPersonalUpdates(manual, new Runnable() { // Also pushes queued updates
// @Override
// public void run() {
// callback.incrementProgress(20);
// if (finisher.decrementAndGet() == 0) {
// finishSync(callback);
// }
// }
// });
}
/** fetch changes to tags */
private void startFeaturedListFetcher(final SyncResultCallback callback,
final AtomicInteger finisher) {
@ -331,294 +159,10 @@ public class ActFmSyncV2Provider extends SyncV2Provider {
}).start();
}
/** @return runnable to fetch changes to tags */
private void startTaskFetcher(final boolean manual, final SyncResultCallback callback,
final AtomicInteger finisher) {
final boolean pushActiveTasksOrder = actFmSyncService.cancelFilterOrderingPush(SubtasksUpdater.ACTIVE_TASKS_ORDER) && manual;
final boolean pushTodayOrder = actFmSyncService.cancelFilterOrderingPush(SubtasksUpdater.TODAY_TASKS_ORDER) && manual;
actFmSyncService.fetchActiveTasks(manual, handler, new Runnable() {
@Override
public void run() {
pushQueuedTasks(callback, finisher);
if (pushActiveTasksOrder)
actFmSyncService.pushFilterOrderingImmediately(SubtasksUpdater.ACTIVE_TASKS_ORDER);
else
actFmSyncService.fetchFilterOrder(SubtasksUpdater.ACTIVE_TASKS_ORDER);
if (pushTodayOrder)
actFmSyncService.pushFilterOrderingImmediately(SubtasksUpdater.TODAY_TASKS_ORDER);
else
actFmSyncService.fetchFilterOrder(SubtasksUpdater.TODAY_TASKS_ORDER);
callback.incrementProgress(30);
if(finisher.decrementAndGet() == 0) {
finishSync(callback);
}
}
});
}
private static interface PushQueuedArgs<T extends AbstractModel> {
public T getRemoteModelInstance(TodorooCursor<T> cursor);
public void pushRemoteModel(T model);
}
private <T extends AbstractModel> void pushQueued(final SyncResultCallback callback, final AtomicInteger finisher,
TodorooCursor<T> cursor, boolean awaitTermination, final PushQueuedArgs<T> pusher) {
callback.incrementMax(cursor.getCount() * 20);
finisher.addAndGet(cursor.getCount());
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
for(int i = 0; i < cursor.getCount(); i++) {
cursor.moveToNext();
final T model = pusher.getRemoteModelInstance(cursor);
executor.submit(new Runnable() {
public void run() {
try {
pusher.pushRemoteModel(model);
} finally {
callback.incrementProgress(20);
if(finisher.decrementAndGet() == 0) {
finishSync(callback);
}
}
}
});
}
executor.shutdown();
if (awaitTermination) {
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void pushQueuedTasks(final SyncResultCallback callback,
final AtomicInteger finisher) {
// TodorooCursor<Task> taskCursor;
// Query query = Query.select(Task.PROPERTIES).
// where(Criterion.or(
// Criterion.and(TaskCriteria.isActive(),
// Task.REMOTE_ID.isNull()),
// Criterion.and(Task.REMOTE_ID.isNotNull(),
// Task.MODIFICATION_DATE.gt(Task.LAST_SYNC))));
// taskCursor = taskService.query(query);
//
// try {
// pushQueued(callback, finisher, taskCursor, true, taskPusher);
// } finally {
// taskCursor.close();
// }
//
// if (ActFmPreferenceService.isPremiumUser()) {
// TodorooCursor<Metadata> filesCursor = metadataService.query(Query.select(Metadata.PROPERTIES)
// .where(Criterion.and(
// MetadataCriteria.withKey(FileMetadata.METADATA_KEY),
// Criterion.or(FileMetadata.REMOTE_ID.eq(0), FileMetadata.DELETION_DATE.gt(0)))));
// try {
// pushQueued(callback, finisher, filesCursor, false, filesPusher);
// } finally {
// filesCursor.close();
// }
// }
}
private void pushQueuedTags(final SyncResultCallback callback,
final AtomicInteger finisher, int lastTagSyncTime) {
// TodorooCursor<TagData> tagDataCursor = tagDataService.query(Query.select(TagData.PROPERTIES)
// .where(Criterion.and(
// Functions.bitwiseAnd(TagData.FLAGS, TagData.FLAG_FEATURED).eq(0),
// Criterion.or(
// TagData.REMOTE_ID.eq(0),
// Criterion.and(TagData.REMOTE_ID.gt(0),
// TagData.MODIFICATION_DATE.gt(lastTagSyncTime))))));
// try {
// pushQueued(callback, finisher, tagDataCursor, true, tagPusher);
// } finally {
// tagDataCursor.close();
// }
}
// private void pushQueuedUsers(final SyncResultCallback callback,
// final AtomicInteger finisher) {
// TodorooCursor<User> users = userDao.query(Query.select(User.PROPERTIES).where(
// Criterion.and(User.PENDING_STATUS.isNotNull(), Functions.length(User.PENDING_STATUS).gt(0))));
// try {
// pushQueued(callback, finisher, users, true, userPusher);
// } finally {
// users.close();
// }
// }
// --- synchronize list
@Override
public void synchronizeList(Object list, final boolean manual,
final SyncResultCallback callback) {
// if (list instanceof User) {
// synchronizeUser((User) list, manual, callback);
// return;
// }
//
// if(!(list instanceof TagData))
// return;
//
// final TagData tagData = (TagData) list;
// final boolean noRemoteId = tagData.getValue(TagData.REMOTE_ID) == 0;
//
// new Thread(new Runnable() {
// public void run() {
// callback.started();
// callback.incrementMax(100);
//
// final AtomicInteger finisher = new AtomicInteger(3);
//
// fetchTagData(tagData, noRemoteId, manual, callback, finisher);
//
// if(!noRemoteId) {
// boolean orderPushQueued = actFmSyncService.cancelTagOrderingPush(tagData.getId()) && manual;
// actFmSyncService.waitUntilEmpty();
// fetchTasksForTag(tagData, manual, orderPushQueued, callback, finisher);
// fetchUpdatesForTag(tagData, manual, callback, finisher);
// }
//
// callback.incrementProgress(50);
// }
// }).start();
}
private void synchronizeUser(final User user, final boolean manual, final SyncResultCallback callback) {
// if (user.getValue(User.REMOTE_ID) == 0)
// return;
//
// new Thread(new Runnable() {
// @Override
// public void run() {
// callback.started();
// callback.incrementMax(100);
//
// actFmSyncService.waitUntilEmpty();
// actFmSyncService.fetchTasksForUser(user, manual, new Runnable() {
// public void run() {
// callback.finished();
// }
// });
// callback.incrementProgress(50);
// }
// }).start();
}
private void fetchTagData(final TagData tagData, final boolean noRemoteId,
final boolean manual, final SyncResultCallback callback,
final AtomicInteger finisher) {
new Thread(new Runnable() {
@Override
public void run() {
String oldName = tagData.getValue(TagData.NAME);
try {
actFmSyncService.fetchTag(tagData);
if(noRemoteId) {
fetchTasksForTag(tagData, manual, true, callback, finisher);
fetchUpdatesForTag(tagData, manual, callback, finisher);
}
if(!oldName.equals(tagData.getValue(TagData.NAME))) {
TagService.getInstance().rename(oldName,
tagData.getValue(TagData.NAME));
}
} catch (IOException e) {
exceptionService.reportError("sync-io", e); //$NON-NLS-1$
} catch (JSONException e) {
exceptionService.reportError("sync-json", e); //$NON-NLS-1$
} finally {
callback.incrementProgress(20);
if(finisher.decrementAndGet() == 0)
callback.finished();
}
}
}).start();
}
private void fetchUpdatesForTag(final TagData tagData, boolean manual, final SyncResultCallback callback,
final AtomicInteger finisher) {
// actFmSyncService.fetchUpdatesForTag(tagData, manual, new Runnable() {
// @Override
// public void run() {
// callback.incrementProgress(20);
// if(finisher.decrementAndGet() == 0)
// callback.finished();
// }
// });
}
private void fetchTasksForTag(final TagData tagData, boolean manual, final boolean pushOrder, final SyncResultCallback callback,
final AtomicInteger finisher) {
actFmSyncService.fetchTasksForTag(tagData, manual, new Runnable() {
@Override
public void run() {
pushQueuedTasksByTag(tagData, callback, finisher);
if (pushOrder)
actFmSyncService.pushTagOrderingImmediately(tagData);
else
actFmSyncService.fetchTagOrder(tagData);
callback.incrementProgress(30);
if(finisher.decrementAndGet() == 0)
callback.finished();
}
});
}
private void pushQueuedTasksByTag(TagData tagData, SyncResultCallback callback, AtomicInteger finisher) {
Long[] ids;
TodorooCursor<Metadata> allTagged = metadataService.query(Query.select(Metadata.TASK).where(Criterion.and(Metadata.KEY.eq(TaskToTagMetadata.KEY),
TaskToTagMetadata.TAG_NAME.eqCaseInsensitive(tagData.getValue(TagData.NAME)))));
try {
ids = new Long[allTagged.getCount()];
Metadata m = new Metadata();
int i = 0;
for (allTagged.moveToFirst(); !allTagged.isAfterLast(); allTagged.moveToNext()) {
m.readFromCursor(allTagged);
ids[i] = m.getValue(Metadata.TASK);
i++;
}
} finally {
allTagged.close();
}
// TodorooCursor<Task> taskCursor = taskService.query(Query.select(Task.PROPERTIES)
// .join(Join.inner(Metadata.TABLE, Criterion.and(Metadata.KEY.eq(TaskToTagMetadata.KEY), Metadata.TASK.eq(Task.ID),
// TaskToTagMetadata.TAG_NAME.eqCaseInsensitive(tagData.getValue(TagData.NAME)))))
// .where(Criterion.or(
// Criterion.and(TaskCriteria.isActive(),
// Task.REMOTE_ID.isNull()),
// Criterion.and(Task.REMOTE_ID.isNotNull(),
// Task.MODIFICATION_DATE.gt(Task.LAST_SYNC)))));
// try {
// pushQueued(callback, finisher, taskCursor, true, taskPusher);
// } finally {
// taskCursor.close();
// }
TodorooCursor<Metadata> filesCursor = metadataService.query(Query.select(Metadata.PROPERTIES)
.where(Criterion.and(
MetadataCriteria.withKey(FileMetadata.METADATA_KEY),
FileMetadata.REMOTE_ID.eq(0),
Metadata.TASK.in(ids))));
try {
pushQueued(callback, finisher, filesCursor, false, filesPusher);
} finally {
filesCursor.close();
}
// Nothing to do
}
}

@ -66,6 +66,8 @@ public abstract class ServerToClientMessage {
return new NowBriefed<TagData>(json, PluginServices.getTagDataDao());
else if (NameMaps.TABLE_ID_USER_ACTIVITY.equals(table))
return new NowBriefed<UserActivity>(json, PluginServices.getUserActivityDao());
else if (NameMaps.TABLE_ID_USERS.equals(table))
return new NowBriefed<User>(json, PluginServices.getUserDao());
else
return null;
}

Loading…
Cancel
Save