diff --git a/src/main/java/com/mixpanel/android/mpmetrics/AnalyticsMessages.java b/src/main/java/com/mixpanel/android/mpmetrics/AnalyticsMessages.java index 71435788..889ac189 100644 --- a/src/main/java/com/mixpanel/android/mpmetrics/AnalyticsMessages.java +++ b/src/main/java/com/mixpanel/android/mpmetrics/AnalyticsMessages.java @@ -7,6 +7,7 @@ import android.os.Looper; import android.os.Message; import android.os.Process; +import android.os.Bundle; import android.util.DisplayMetrics; import com.mixpanel.android.util.Base64Coder; @@ -26,6 +27,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.CountDownLatch; import javax.net.ssl.SSLSocketFactory; @@ -117,13 +119,44 @@ public void clearAnonymousUpdatesMessage(final MixpanelDescription clearAnonymou public void postToServer(final MixpanelDescription flushDescription) { final Message m = Message.obtain(); + Bundle data = new Bundle(); + data.putString("token", flushDescription.getToken()); + m.setData(data); m.what = FLUSH_QUEUE; - m.obj = flushDescription.getToken(); m.arg1 = 0; + m.arg2 = 0; mWorker.runMessage(m); } + public Integer postToServer(final MixpanelDescription flushDescription, boolean sync) { + if (!sync) { + postToServer(flushDescription); + return -1; + } + + final Message m = Message.obtain(); + Bundle data = new Bundle(); + data.putString("token", flushDescription.getToken()); + m.setData(data); + m.what = FLUSH_QUEUE; + m.obj = new CountDownLatch(1); + m.arg1 = 0; + m.arg2 = 1; + + mWorker.runMessage(m); + Integer resultCode = data.getInt("returnCode"); + + logAboutMessageToMixpanel("Status Code Main Thread: " + resultCode); + logAboutMessageToMixpanel("bundle accessing from post to server: " + resultCode); + + try { + return resultCode.intValue(); + } catch(NullPointerException e) { + return -1; + } + } + public void emptyTrackingQueues(final MixpanelDescription mixpanelDescription) { final Message m = Message.obtain(); m.what = EMPTY_QUEUES; @@ -341,6 +374,14 @@ public void runMessage(Message msg) { logAboutMessageToMixpanel("Dead mixpanel worker dropping a message: " + msg.what); } else { mHandler.sendMessage(msg); + if (msg.arg2 != 0) { + try { + CountDownLatch flushLatch = (CountDownLatch) msg.obj; + flushLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } } } @@ -416,8 +457,12 @@ public void handleMessage(Message msg) { } else if (msg.what == FLUSH_QUEUE) { logAboutMessageToMixpanel("Flushing queue due to scheduled or forced flush"); updateFlushFrequency(); - token = (String) msg.obj; - sendAllData(mDbAdapter, token); + token = msg.getData().getString("token"); + int res = sendAllData(mDbAdapter, token); + if (msg.arg2 != 0) { + msg.getData().putInt("returnCode", res); + ((CountDownLatch) msg.obj).countDown(); + } } else if (msg.what == EMPTY_QUEUES) { final MixpanelDescription message = (MixpanelDescription) msg.obj; token = message.getToken(); @@ -478,19 +523,23 @@ protected long getTrackEngageRetryAfter() { return mTrackEngageRetryAfter; } - private void sendAllData(MPDbAdapter dbAdapter, String token) { + private int sendAllData(MPDbAdapter dbAdapter, String token) { final RemoteService poster = getPoster(); if (!poster.isOnline(mContext, mConfig.getOfflineMode())) { logAboutMessageToMixpanel("Not flushing data to Mixpanel because the device is not connected to the internet."); - return; + return -1; } - sendData(dbAdapter, token, MPDbAdapter.Table.EVENTS, mConfig.getEventsEndpoint()); - sendData(dbAdapter, token, MPDbAdapter.Table.PEOPLE, mConfig.getPeopleEndpoint()); - sendData(dbAdapter, token, MPDbAdapter.Table.GROUPS, mConfig.getGroupsEndpoint()); + int eventsSent = sendData(dbAdapter, token, MPDbAdapter.Table.EVENTS, mConfig.getEventsEndpoint()); + int peopleSent = sendData(dbAdapter, token, MPDbAdapter.Table.PEOPLE, mConfig.getPeopleEndpoint()); + int groupsSent = sendData(dbAdapter, token, MPDbAdapter.Table.GROUPS, mConfig.getGroupsEndpoint()); + + logAboutMessageToMixpanel("Data flushed to Mixpanel successfully. Events sent: " + eventsSent + ", People sent: " + peopleSent + ", Groups sent: " + groupsSent); + return eventsSent; } - private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table table, String url) { + private int sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table table, String url) { + final RemoteService poster = getPoster(); String[] eventsData = dbAdapter.generateDataString(table, token); Integer queueCount = 0; @@ -498,6 +547,8 @@ private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table tab queueCount = Integer.valueOf(eventsData[2]); } + int statusCode = -1; // Initialize with an invalid status code + while (eventsData != null && queueCount > 0) { final String lastId = eventsData[0]; final String rawMessage = eventsData[1]; @@ -510,28 +561,22 @@ private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table tab } boolean deleteEvents = true; - byte[] response; try { final SSLSocketFactory socketFactory = mConfig.getSSLSocketFactory(); - response = poster.performRequest(url, params, socketFactory); - if (null == response) { + statusCode = poster.performRequest(url, params, socketFactory); // Update the status code + if (-1 == statusCode) { deleteEvents = false; logAboutMessageToMixpanel("Response was null, unexpected failure posting to " + url + "."); } else { deleteEvents = true; // Delete events on any successful post, regardless of 1 or 0 response - String parsedResponse; - try { - parsedResponse = new String(response, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("UTF not supported on this platform?", e); - } + if (mFailedRetries > 0) { mFailedRetries = 0; removeMessages(FLUSH_QUEUE, token); } logAboutMessageToMixpanel("Successfully posted to " + url + ": \n" + rawMessage); - logAboutMessageToMixpanel("Response was " + parsedResponse); + logAboutMessageToMixpanel("Response was " + statusCode); } } catch (final OutOfMemoryError e) { MPLog.e(LOGTAG, "Out of memory when posting to " + url + ".", e); @@ -570,8 +615,9 @@ private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table tab queueCount = Integer.valueOf(eventsData[2]); } } - } + return statusCode; // Return the status code + } private JSONObject getDefaultEventProperties() throws JSONException { final JSONObject ret = new JSONObject(); diff --git a/src/main/java/com/mixpanel/android/mpmetrics/MPConfig.java b/src/main/java/com/mixpanel/android/mpmetrics/MPConfig.java index fcb7a083..d807c6fd 100644 --- a/src/main/java/com/mixpanel/android/mpmetrics/MPConfig.java +++ b/src/main/java/com/mixpanel/android/mpmetrics/MPConfig.java @@ -185,7 +185,7 @@ public synchronized void setOfflineMode(OfflineMode offlineMode) { } mBulkUploadLimit = metaData.getInt("com.mixpanel.android.MPConfig.BulkUploadLimit", 40); // 40 records default - mFlushInterval = metaData.getInt("com.mixpanel.android.MPConfig.FlushInterval", 60 * 1000); // one minute default + mFlushInterval = metaData.getInt("com.mixpanel.android.MPConfig.FlushInterval", Integer.MAX_VALUE); // one minute default mFlushBatchSize = metaData.getInt("com.mixpanel.android.MPConfig.FlushBatchSize", 50); // flush 50 events at a time by default mFlushOnBackground = metaData.getBoolean("com.mixpanel.android.MPConfig.FlushOnBackground", true); mMinimumDatabaseLimit = metaData.getInt("com.mixpanel.android.MPConfig.MinimumDatabaseLimit", 20 * 1024 * 1024); // 20 Mb diff --git a/src/main/java/com/mixpanel/android/mpmetrics/MixpanelAPI.java b/src/main/java/com/mixpanel/android/mpmetrics/MixpanelAPI.java index e5608cc3..d1926e20 100644 --- a/src/main/java/com/mixpanel/android/mpmetrics/MixpanelAPI.java +++ b/src/main/java/com/mixpanel/android/mpmetrics/MixpanelAPI.java @@ -14,6 +14,8 @@ import android.os.Bundle; import com.mixpanel.android.util.MPLog; +import com.mixpanel.android.util.RemoteService; +import com.mixpanel.android.util.HttpService; import org.json.JSONArray; import org.json.JSONException; @@ -841,6 +843,26 @@ public void flush() { mMessages.postToServer(new AnalyticsMessages.MixpanelDescription(mToken)); } + + /** + * Synchronized flush to make sure all events sent before continuing + * made exclusively for AppDome purposes + * + * @return HTTP status code / -1 if device is failed + */ + public int blockingFlush() { + if (hasOptedOutTracking()) + return -1; + + final RemoteService poster = new HttpService(); + if (!poster.isOnline(mContext, mConfig.getOfflineMode())) { + MPLog.w(LOGTAG, "Not flushing data to Mixpanel because the device is not connected to the internet."); + return 503; + } + + return mMessages.postToServer(new AnalyticsMessages.MixpanelDescription(mToken), true); + } + /** * Returns a json object of the user's current super properties * diff --git a/src/main/java/com/mixpanel/android/util/HttpService.java b/src/main/java/com/mixpanel/android/util/HttpService.java index 97e0a5d2..911f0b45 100644 --- a/src/main/java/com/mixpanel/android/util/HttpService.java +++ b/src/main/java/com/mixpanel/android/util/HttpService.java @@ -88,10 +88,10 @@ private boolean onOfflineMode(OfflineMode offlineMode) { } @Override - public byte[] performRequest(String endpointUrl, Map params, SSLSocketFactory socketFactory) throws ServiceUnavailableException, IOException { + public int performRequest(String endpointUrl, Map params, SSLSocketFactory socketFactory) throws ServiceUnavailableException, IOException { MPLog.v(LOGTAG, "Attempting request to " + endpointUrl); - byte[] response = null; + int statusCode = -1; // the while(retries) loop is a workaround for a bug in some Android HttpURLConnection // libraries- The underlying library will attempt to reuse stale connections, @@ -133,10 +133,7 @@ public byte[] performRequest(String endpointUrl, Map params, SSL out.close(); out = null; } - in = connection.getInputStream(); - response = slurp(in); - in.close(); - in = null; + statusCode = connection.getResponseCode(); succeeded = true; } catch (final EOFException e) { MPLog.d(LOGTAG, "Failure to connect, likely caused by a known issue with Android lib. Retrying."); @@ -162,7 +159,7 @@ public byte[] performRequest(String endpointUrl, Map params, SSL if (retries >= 3) { MPLog.v(LOGTAG, "Could not connect to Mixpanel service after three retries."); } - return response; + return statusCode; } private static byte[] slurp(final InputStream inputStream) diff --git a/src/main/java/com/mixpanel/android/util/RemoteService.java b/src/main/java/com/mixpanel/android/util/RemoteService.java index c73582f9..0d7b439d 100644 --- a/src/main/java/com/mixpanel/android/util/RemoteService.java +++ b/src/main/java/com/mixpanel/android/util/RemoteService.java @@ -14,7 +14,7 @@ public interface RemoteService { void checkIsMixpanelBlocked(); - byte[] performRequest(String endpointUrl, Map params, SSLSocketFactory socketFactory) + int performRequest(String endpointUrl, Map params, SSLSocketFactory socketFactory) throws ServiceUnavailableException, IOException; class ServiceUnavailableException extends Exception {