Use a map from messages to callbacks instead of two parallel queues. This should resolve some timing bugs--callbacks will only be called in the same loop that their message ocurrs

pull/14/head
Sam Bosley 13 years ago
parent 1bb70597e2
commit 5b200ec849

@ -2,8 +2,10 @@ package com.todoroo.astrid.actfm.sync;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONObject; import org.json.JSONObject;
@ -48,7 +50,7 @@ public class ActFmSyncThread {
private static final String ERROR_TAG = "actfm-sync-thread"; //$NON-NLS-1$ private static final String ERROR_TAG = "actfm-sync-thread"; //$NON-NLS-1$
private final List<ClientToServerMessage<?>> pendingMessages; private final List<ClientToServerMessage<?>> pendingMessages;
private final List<Runnable> pendingCallbacks; private final Map<ClientToServerMessage<?>, Runnable> pendingCallbacks;
private final Object monitor; private final Object monitor;
private Thread thread; private Thread thread;
@ -102,10 +104,9 @@ public class ActFmSyncThread {
synchronized(ActFmSyncThread.class) { synchronized(ActFmSyncThread.class) {
if (instance == null) { if (instance == null) {
List<ClientToServerMessage<?>> syncQueue = Collections.synchronizedList(new LinkedList<ClientToServerMessage<?>>()); List<ClientToServerMessage<?>> syncQueue = Collections.synchronizedList(new LinkedList<ClientToServerMessage<?>>());
List<Runnable> callbackQueue = Collections.synchronizedList(new LinkedList<Runnable>());
ActFmSyncMonitor monitor = ActFmSyncMonitor.getInstance(); ActFmSyncMonitor monitor = ActFmSyncMonitor.getInstance();
instance = new ActFmSyncThread(syncQueue, callbackQueue, monitor); instance = new ActFmSyncThread(syncQueue, monitor);
taskDao.addListener(new SyncDatabaseListener<Task>(instance, ModelType.TYPE_TASK)); taskDao.addListener(new SyncDatabaseListener<Task>(instance, ModelType.TYPE_TASK));
tagDataDao.addListener(new SyncDatabaseListener<TagData>(instance, ModelType.TYPE_TAG)); tagDataDao.addListener(new SyncDatabaseListener<TagData>(instance, ModelType.TYPE_TAG));
@ -118,10 +119,10 @@ public class ActFmSyncThread {
return instance; return instance;
} }
private ActFmSyncThread(List<ClientToServerMessage<?>> messageQueue, List<Runnable> callbackQueue, Object syncMonitor) { private ActFmSyncThread(List<ClientToServerMessage<?>> messageQueue, Object syncMonitor) {
DependencyInjectionService.getInstance().inject(this); DependencyInjectionService.getInstance().inject(this);
this.pendingMessages = messageQueue; this.pendingMessages = messageQueue;
this.pendingCallbacks = callbackQueue; this.pendingCallbacks = Collections.synchronizedMap(new HashMap<ClientToServerMessage<?>, Runnable>());
this.monitor = syncMonitor; this.monitor = syncMonitor;
} }
@ -142,7 +143,7 @@ public class ActFmSyncThread {
if (!pendingMessages.contains(message)) { if (!pendingMessages.contains(message)) {
pendingMessages.add(message); pendingMessages.add(message);
if (callback != null) if (callback != null)
pendingCallbacks.add(callback); pendingCallbacks.put(message, callback);
synchronized(monitor) { synchronized(monitor) {
monitor.notifyAll(); monitor.notifyAll();
} }
@ -154,7 +155,6 @@ public class ActFmSyncThread {
try { try {
int batchSize = 1; int batchSize = 1;
List<ClientToServerMessage<?>> messageBatch = new LinkedList<ClientToServerMessage<?>>(); List<ClientToServerMessage<?>> messageBatch = new LinkedList<ClientToServerMessage<?>>();
List<Runnable> callbackBatch = new LinkedList<Runnable>();
while(true) { while(true) {
synchronized(monitor) { synchronized(monitor) {
while ((pendingMessages.isEmpty() && !timeForBackgroundSync()) || !actFmPreferenceService.isLoggedIn()) { while ((pendingMessages.isEmpty() && !timeForBackgroundSync()) || !actFmPreferenceService.isLoggedIn()) {
@ -174,21 +174,11 @@ public class ActFmSyncThread {
messageBatch.add(message); messageBatch.add(message);
} }
while (callbackBatch.size() < batchSize && !pendingCallbacks.isEmpty()) { boolean refreshAfterBatch = false;
Runnable callback = pendingCallbacks.remove(0);
callbackBatch.add(callback);
}
if (messageBatch.isEmpty() && timeForBackgroundSync()) { if (messageBatch.isEmpty() && timeForBackgroundSync()) {
messageBatch.add(BriefMe.instantiateBriefMeForClass(Task.class, NameMaps.PUSHED_AT_TASKS)); messageBatch.add(BriefMe.instantiateBriefMeForClass(Task.class, NameMaps.PUSHED_AT_TASKS));
messageBatch.add(BriefMe.instantiateBriefMeForClass(TagData.class, NameMaps.PUSHED_AT_TAGS)); messageBatch.add(BriefMe.instantiateBriefMeForClass(TagData.class, NameMaps.PUSHED_AT_TAGS));
callbackBatch.add(new Runnable() { refreshAfterBatch = true;
@Override
public void run() {
Intent refresh = new Intent(AstridApiConstants.BROADCAST_EVENT_REFRESH);
ContextManager.getContext().sendBroadcast(refresh);
}
});
} }
if (!messageBatch.isEmpty() && checkForToken()) { if (!messageBatch.isEmpty() && checkForToken()) {
@ -231,12 +221,18 @@ public class ActFmSyncThread {
Log.e(ERROR_TAG, "IOException", e); Log.e(ERROR_TAG, "IOException", e);
batchSize = Math.max(batchSize / 2, 1); batchSize = Math.max(batchSize / 2, 1);
} }
for (Runnable r : callbackBatch) {
r.run(); for (ClientToServerMessage<?> message : messageBatch) {
Runnable r = pendingCallbacks.remove(message);
if (r != null)
r.run();
}
if (refreshAfterBatch) {
Intent refresh = new Intent(AstridApiConstants.BROADCAST_EVENT_REFRESH);
ContextManager.getContext().sendBroadcast(refresh);
} }
messageBatch = new LinkedList<ClientToServerMessage<?>>(); messageBatch = new LinkedList<ClientToServerMessage<?>>();
callbackBatch = new LinkedList<Runnable>();
} }
} }
} catch (Exception e) { } catch (Exception e) {

Loading…
Cancel
Save