code cleanup
* migrate few annotations to androidx * mission recovery: better error handling (except StreamExtractor.getErrorMessage() method always returns an error) * post-processing: more detailed progress [file specific changes] DownloadMission.java * remove redundant/boilerplate code (again) * make few variables volatile * better file "length" approximation * use "done" variable to count the amount of bytes downloaded (simplify percent calc in UI code) Postprocessing.java * if case of error use "ERROR_POSTPROCESSING" instead of "ERROR_UNKNOWN_EXCEPTION" * simplify source stream init DownloadManager.java * move all "service message sending" code to DownloadMission * remove not implemented method "notifyUserPendingDownloads()" also his unused strings DownloadManagerService.java * use START_STICKY instead of START_NOT_STICKY * simplify addMissionEventListener()/removeMissionEventListener() methods (always are called from the main thread) Deleter.java * better method definition MissionAdapter.java * better method definition * code cleanup * the UI is now refreshed every 750ms * simplify download progress calculation * indicates if the download is actually recovering * smooth download speed measure * show estimated remain time MainFragment.java: * check if viewPager is null (issued by "Apply changes" feature of Android Studio)
This commit is contained in:
parent
763995d4c9
commit
e6d9d8e26d
53 changed files with 554 additions and 622 deletions
|
|
@ -4,18 +4,21 @@ import android.os.Handler;
|
|||
import android.util.Log;
|
||||
|
||||
import androidx.annotation.Nullable;
|
||||
import androidx.annotation.NonNull;
|
||||
|
||||
import org.schabi.newpipe.DownloaderImpl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.io.Serializable;
|
||||
import java.net.ConnectException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URL;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
|
||||
|
|
@ -27,7 +30,7 @@ import us.shandian.giga.util.Utility;
|
|||
import static org.schabi.newpipe.BuildConfig.DEBUG;
|
||||
|
||||
public class DownloadMission extends Mission {
|
||||
private static final long serialVersionUID = 6L;// last bump: 28 september 2019
|
||||
private static final long serialVersionUID = 6L;// last bump: 07 october 2019
|
||||
|
||||
static final int BUFFER_SIZE = 64 * 1024;
|
||||
static final int BLOCK_SIZE = 512 * 1024;
|
||||
|
|
@ -61,9 +64,9 @@ public class DownloadMission extends Mission {
|
|||
public String[] urls;
|
||||
|
||||
/**
|
||||
* Number of bytes downloaded
|
||||
* Number of bytes downloaded and written
|
||||
*/
|
||||
public long done;
|
||||
public volatile long done;
|
||||
|
||||
/**
|
||||
* Indicates a file generated dynamically on the web server
|
||||
|
|
@ -119,7 +122,7 @@ public class DownloadMission extends Mission {
|
|||
/**
|
||||
* Download/File resume offset in fallback mode (if applicable) {@link DownloadRunnableFallback}
|
||||
*/
|
||||
long fallbackResumeOffset;
|
||||
volatile long fallbackResumeOffset;
|
||||
|
||||
/**
|
||||
* Maximum of download threads running, chosen by the user
|
||||
|
|
@ -132,22 +135,23 @@ public class DownloadMission extends Mission {
|
|||
public MissionRecoveryInfo[] recoveryInfo;
|
||||
|
||||
private transient int finishCount;
|
||||
public transient boolean running;
|
||||
public transient volatile boolean running;
|
||||
public boolean enqueued;
|
||||
|
||||
public int errCode = ERROR_NOTHING;
|
||||
public Exception errObject = null;
|
||||
|
||||
public transient Handler mHandler;
|
||||
private transient boolean mWritingToFile;
|
||||
private transient boolean[] blockAcquired;
|
||||
|
||||
private transient long writingToFileNext;
|
||||
private transient volatile boolean writingToFile;
|
||||
|
||||
final Object LOCK = new Lock();
|
||||
|
||||
private transient boolean deleted;
|
||||
|
||||
public transient volatile Thread[] threads = new Thread[0];
|
||||
private transient Thread init = null;
|
||||
@NonNull
|
||||
public transient Thread[] threads = new Thread[0];
|
||||
public transient Thread init = null;
|
||||
|
||||
public DownloadMission(String[] urls, StoredFileHelper storage, char kind, Postprocessing psInstance) {
|
||||
if (urls == null) throw new NullPointerException("urls is null");
|
||||
|
|
@ -246,8 +250,10 @@ public class DownloadMission extends Mission {
|
|||
int statusCode = conn.getResponseCode();
|
||||
|
||||
if (DEBUG) {
|
||||
Log.d(TAG, threadId + ":Range=" + conn.getRequestProperty("Range"));
|
||||
Log.d(TAG, threadId + ":Content-Length=" + conn.getContentLength() + " Code:" + statusCode);
|
||||
Log.d(TAG, threadId + ":[request] Range=" + conn.getRequestProperty("Range"));
|
||||
Log.d(TAG, threadId + ":[response] Code=" + statusCode);
|
||||
Log.d(TAG, threadId + ":[response] Content-Length=" + conn.getContentLength());
|
||||
Log.d(TAG, threadId + ":[response] Content-Range=" + conn.getHeaderField("Content-Range"));
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -272,24 +278,19 @@ public class DownloadMission extends Mission {
|
|||
}
|
||||
|
||||
synchronized void notifyProgress(long deltaLen) {
|
||||
if (!running) return;
|
||||
|
||||
if (unknownLength) {
|
||||
length += deltaLen;// Update length before proceeding
|
||||
}
|
||||
|
||||
done += deltaLen;
|
||||
|
||||
if (done > length) {
|
||||
done = length;
|
||||
}
|
||||
if (metadata == null) return;
|
||||
|
||||
if (done != length && !deleted && !mWritingToFile) {
|
||||
mWritingToFile = true;
|
||||
runAsync(-2, this::writeThisToFile);
|
||||
if (!writingToFile && (done > writingToFileNext || deltaLen < 0)) {
|
||||
writingToFile = true;
|
||||
writingToFileNext = done + BLOCK_SIZE;
|
||||
writeThisToFileAsync();
|
||||
}
|
||||
|
||||
notify(DownloadManagerService.MESSAGE_PROGRESS);
|
||||
}
|
||||
|
||||
synchronized void notifyError(Exception err) {
|
||||
|
|
@ -342,43 +343,42 @@ public class DownloadMission extends Mission {
|
|||
|
||||
notify(DownloadManagerService.MESSAGE_ERROR);
|
||||
|
||||
if (running) {
|
||||
running = false;
|
||||
if (threads != null) selfPause();
|
||||
}
|
||||
if (running) pauseThreads();
|
||||
}
|
||||
|
||||
synchronized void notifyFinished() {
|
||||
if (errCode > ERROR_NOTHING) return;
|
||||
|
||||
finishCount++;
|
||||
|
||||
if (blocks.length < 1 || threads == null || finishCount == threads.length) {
|
||||
if (errCode != ERROR_NOTHING) return;
|
||||
if (current < urls.length) {
|
||||
if (++finishCount < threads.length) return;
|
||||
|
||||
if (DEBUG) {
|
||||
Log.d(TAG, "onFinish: " + (current + 1) + "/" + urls.length);
|
||||
}
|
||||
|
||||
if ((current + 1) < urls.length) {
|
||||
// prepare next sub-mission
|
||||
long current_offset = offsets[current++];
|
||||
offsets[current] = current_offset + length;
|
||||
initializer();
|
||||
return;
|
||||
Log.d(TAG, "onFinish: downloaded " + (current + 1) + "/" + urls.length);
|
||||
}
|
||||
|
||||
current++;
|
||||
unknownLength = false;
|
||||
|
||||
if (!doPostprocessing()) return;
|
||||
|
||||
enqueued = false;
|
||||
running = false;
|
||||
deleteThisFromFile();
|
||||
|
||||
notify(DownloadManagerService.MESSAGE_FINISHED);
|
||||
if (current < urls.length) {
|
||||
// prepare next sub-mission
|
||||
offsets[current] = offsets[current - 1] + length;
|
||||
initializer();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (psAlgorithm != null && psState == 0) {
|
||||
threads = new Thread[]{
|
||||
runAsync(1, this::doPostprocessing)
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// this mission is fully finished
|
||||
|
||||
unknownLength = false;
|
||||
enqueued = false;
|
||||
running = false;
|
||||
|
||||
deleteThisFromFile();
|
||||
notify(DownloadManagerService.MESSAGE_FINISHED);
|
||||
}
|
||||
|
||||
private void notifyPostProcessing(int state) {
|
||||
|
|
@ -396,10 +396,15 @@ public class DownloadMission extends Mission {
|
|||
|
||||
Log.d(TAG, action + " postprocessing on " + storage.getName());
|
||||
|
||||
if (state == 2) {
|
||||
psState = state;
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (LOCK) {
|
||||
// don't return without fully write the current state
|
||||
psState = state;
|
||||
Utility.writeToFile(metadata, DownloadMission.this);
|
||||
writeThisToFile();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -411,12 +416,7 @@ public class DownloadMission extends Mission {
|
|||
if (running || isFinished() || urls.length < 1) return;
|
||||
|
||||
// ensure that the previous state is completely paused.
|
||||
int maxWait = 10000;// 10 seconds
|
||||
joinForThread(init, maxWait);
|
||||
if (threads != null) {
|
||||
for (Thread thread : threads) joinForThread(thread, maxWait);
|
||||
threads = null;
|
||||
}
|
||||
joinForThreads(10000);
|
||||
|
||||
running = true;
|
||||
errCode = ERROR_NOTHING;
|
||||
|
|
@ -427,12 +427,14 @@ public class DownloadMission extends Mission {
|
|||
}
|
||||
|
||||
if (current >= urls.length) {
|
||||
runAsync(1, this::notifyFinished);
|
||||
notifyFinished();
|
||||
return;
|
||||
}
|
||||
|
||||
notify(DownloadManagerService.MESSAGE_RUNNING);
|
||||
|
||||
if (urls[current] == null) {
|
||||
doRecover(null);
|
||||
doRecover(ERROR_RESOURCE_GONE);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -446,18 +448,13 @@ public class DownloadMission extends Mission {
|
|||
blockAcquired = new boolean[blocks.length];
|
||||
|
||||
if (blocks.length < 1) {
|
||||
if (unknownLength) {
|
||||
done = 0;
|
||||
length = 0;
|
||||
}
|
||||
|
||||
threads = new Thread[]{runAsync(1, new DownloadRunnableFallback(this))};
|
||||
} else {
|
||||
int remainingBlocks = 0;
|
||||
for (int block : blocks) if (block >= 0) remainingBlocks++;
|
||||
|
||||
if (remainingBlocks < 1) {
|
||||
runAsync(1, this::notifyFinished);
|
||||
notifyFinished();
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -483,6 +480,7 @@ public class DownloadMission extends Mission {
|
|||
}
|
||||
|
||||
running = false;
|
||||
notify(DownloadManagerService.MESSAGE_PAUSED);
|
||||
|
||||
if (init != null && init.isAlive()) {
|
||||
// NOTE: if start() method is running ¡will no have effect!
|
||||
|
|
@ -497,29 +495,14 @@ public class DownloadMission extends Mission {
|
|||
Log.w(TAG, "pausing a download that can not be resumed (range requests not allowed by the server).");
|
||||
}
|
||||
|
||||
// check if the calling thread (alias UI thread) is interrupted
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
writeThisToFile();
|
||||
return;
|
||||
}
|
||||
|
||||
// wait for all threads are suspended before save the state
|
||||
if (threads != null) runAsync(-1, this::selfPause);
|
||||
init = null;
|
||||
pauseThreads();
|
||||
}
|
||||
|
||||
private void selfPause() {
|
||||
try {
|
||||
for (Thread thread : threads) {
|
||||
if (thread.isAlive()) {
|
||||
thread.interrupt();
|
||||
thread.join(5000);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// nothing to do
|
||||
} finally {
|
||||
writeThisToFile();
|
||||
}
|
||||
private void pauseThreads() {
|
||||
running = false;
|
||||
joinForThreads(-1);
|
||||
writeThisToFile();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -527,9 +510,10 @@ public class DownloadMission extends Mission {
|
|||
*/
|
||||
@Override
|
||||
public boolean delete() {
|
||||
deleted = true;
|
||||
if (psAlgorithm != null) psAlgorithm.cleanupTemporalDir();
|
||||
|
||||
notify(DownloadManagerService.MESSAGE_DELETED);
|
||||
|
||||
boolean res = deleteThisFromFile();
|
||||
|
||||
if (!super.delete()) return false;
|
||||
|
|
@ -544,35 +528,37 @@ public class DownloadMission extends Mission {
|
|||
* @param persistChanges {@code true} to commit changes to the metadata file, otherwise, {@code false}
|
||||
*/
|
||||
public void resetState(boolean rollback, boolean persistChanges, int errorCode) {
|
||||
done = 0;
|
||||
length = 0;
|
||||
errCode = errorCode;
|
||||
errObject = null;
|
||||
unknownLength = false;
|
||||
threads = null;
|
||||
threads = new Thread[0];
|
||||
fallbackResumeOffset = 0;
|
||||
blocks = null;
|
||||
blockAcquired = null;
|
||||
|
||||
if (rollback) current = 0;
|
||||
|
||||
if (persistChanges)
|
||||
Utility.writeToFile(metadata, DownloadMission.this);
|
||||
if (persistChanges) writeThisToFile();
|
||||
}
|
||||
|
||||
private void initializer() {
|
||||
init = runAsync(DownloadInitializer.mId, new DownloadInitializer(this));
|
||||
}
|
||||
|
||||
private void writeThisToFileAsync() {
|
||||
runAsync(-2, this::writeThisToFile);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write this {@link DownloadMission} to the meta file asynchronously
|
||||
* if no thread is already running.
|
||||
*/
|
||||
void writeThisToFile() {
|
||||
synchronized (LOCK) {
|
||||
if (deleted) return;
|
||||
Utility.writeToFile(metadata, DownloadMission.this);
|
||||
if (metadata == null) return;
|
||||
Utility.writeToFile(metadata, this);
|
||||
writingToFile = false;
|
||||
}
|
||||
mWritingToFile = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -625,11 +611,10 @@ public class DownloadMission extends Mission {
|
|||
public long getLength() {
|
||||
long calculated;
|
||||
if (psState == 1 || psState == 3) {
|
||||
calculated = length;
|
||||
} else {
|
||||
calculated = offsets[current < offsets.length ? current : (offsets.length - 1)] + length;
|
||||
return length;
|
||||
}
|
||||
|
||||
calculated = offsets[current < offsets.length ? current : (offsets.length - 1)] + length;
|
||||
calculated -= offsets[0];// don't count reserved space
|
||||
|
||||
return calculated > nearLength ? calculated : nearLength;
|
||||
|
|
@ -642,7 +627,7 @@ public class DownloadMission extends Mission {
|
|||
*/
|
||||
public void setEnqueued(boolean queue) {
|
||||
enqueued = queue;
|
||||
runAsync(-2, this::writeThisToFile);
|
||||
writeThisToFileAsync();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -681,24 +666,19 @@ public class DownloadMission extends Mission {
|
|||
* @return {@code true} if the mission is running a recovery procedure, otherwise, {@code false}
|
||||
*/
|
||||
public boolean isRecovering() {
|
||||
return threads != null && threads.length > 0 && threads[0] instanceof DownloadRunnable && threads[0].isAlive();
|
||||
return threads.length > 0 && threads[0] instanceof DownloadMissionRecover && threads[0].isAlive();
|
||||
}
|
||||
|
||||
private boolean doPostprocessing() {
|
||||
if (psAlgorithm == null || psState == 2) return true;
|
||||
|
||||
private void doPostprocessing() {
|
||||
errCode = ERROR_NOTHING;
|
||||
errObject = null;
|
||||
Thread thread = Thread.currentThread();
|
||||
|
||||
notifyPostProcessing(1);
|
||||
notifyProgress(0);
|
||||
|
||||
if (DEBUG)
|
||||
Thread.currentThread().setName("[" + TAG + "] ps = " +
|
||||
psAlgorithm.getClass().getSimpleName() +
|
||||
" filename = " + storage.getName()
|
||||
);
|
||||
|
||||
threads = new Thread[]{Thread.currentThread()};
|
||||
if (DEBUG) {
|
||||
thread.setName("[" + TAG + "] ps = " + psAlgorithm + " filename = " + storage.getName());
|
||||
}
|
||||
|
||||
Exception exception = null;
|
||||
|
||||
|
|
@ -707,6 +687,11 @@ public class DownloadMission extends Mission {
|
|||
} catch (Exception err) {
|
||||
Log.e(TAG, "Post-processing failed. " + psAlgorithm.toString(), err);
|
||||
|
||||
if (err instanceof InterruptedIOException || err instanceof ClosedByInterruptException || thread.isInterrupted()) {
|
||||
notifyError(DownloadMission.ERROR_POSTPROCESSING_STOPPED, null);
|
||||
return;
|
||||
}
|
||||
|
||||
if (errCode == ERROR_NOTHING) errCode = ERROR_POSTPROCESSING;
|
||||
|
||||
exception = err;
|
||||
|
|
@ -717,56 +702,38 @@ public class DownloadMission extends Mission {
|
|||
if (errCode != ERROR_NOTHING) {
|
||||
if (exception == null) exception = errObject;
|
||||
notifyError(ERROR_POSTPROCESSING, exception);
|
||||
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
|
||||
return true;
|
||||
notifyFinished();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to recover the download
|
||||
*
|
||||
* @param fromError exception which require update the url from the source
|
||||
* @param errorCode error code which trigger the recovery procedure
|
||||
*/
|
||||
void doRecover(Exception fromError) {
|
||||
void doRecover(int errorCode) {
|
||||
Log.i(TAG, "Attempting to recover the mission: " + storage.getName());
|
||||
|
||||
if (recoveryInfo == null) {
|
||||
if (fromError == null)
|
||||
notifyError(ERROR_RESOURCE_GONE, null);
|
||||
else
|
||||
notifyError(fromError);
|
||||
|
||||
notifyError(errorCode, null);
|
||||
urls = new String[0];// mark this mission as dead
|
||||
return;
|
||||
}
|
||||
|
||||
if (threads != null) {
|
||||
for (Thread thread : threads) {
|
||||
if (thread == Thread.currentThread()) continue;
|
||||
thread.interrupt();
|
||||
joinForThread(thread, 0);
|
||||
}
|
||||
}
|
||||
|
||||
errCode = ERROR_NOTHING;
|
||||
errObject = null;
|
||||
|
||||
if (recoveryInfo[current].attempts >= maxRetry) {
|
||||
recoveryInfo[current].attempts = 0;
|
||||
notifyError(fromError);
|
||||
return;
|
||||
}
|
||||
joinForThreads(0);
|
||||
|
||||
threads = new Thread[]{
|
||||
runAsync(DownloadMissionRecover.mID, new DownloadMissionRecover(this, fromError))
|
||||
runAsync(DownloadMissionRecover.mID, new DownloadMissionRecover(this, errorCode))
|
||||
};
|
||||
}
|
||||
|
||||
private boolean deleteThisFromFile() {
|
||||
synchronized (LOCK) {
|
||||
return metadata.delete();
|
||||
boolean res = metadata.delete();
|
||||
metadata = null;
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -776,8 +743,8 @@ public class DownloadMission extends Mission {
|
|||
* @param id id of new thread (used for debugging only)
|
||||
* @param who the Runnable whose {@code run} method is invoked.
|
||||
*/
|
||||
private void runAsync(int id, Runnable who) {
|
||||
runAsync(id, new Thread(who));
|
||||
private Thread runAsync(int id, Runnable who) {
|
||||
return runAsync(id, new Thread(who));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -806,28 +773,44 @@ public class DownloadMission extends Mission {
|
|||
/**
|
||||
* Waits at most {@code millis} milliseconds for the thread to die
|
||||
*
|
||||
* @param thread the desired thread
|
||||
* @param millis the time to wait in milliseconds
|
||||
*/
|
||||
private void joinForThread(Thread thread, int millis) {
|
||||
if (thread == null || !thread.isAlive()) return;
|
||||
if (thread == Thread.currentThread()) return;
|
||||
private void joinForThreads(int millis) {
|
||||
final Thread currentThread = Thread.currentThread();
|
||||
|
||||
if (DEBUG) {
|
||||
Log.w(TAG, "a thread is !still alive!: " + thread.getName());
|
||||
if (init != null && init != currentThread && init.isAlive()) {
|
||||
init.interrupt();
|
||||
|
||||
if (millis > 0) {
|
||||
try {
|
||||
init.join(millis);
|
||||
} catch (InterruptedException e) {
|
||||
Log.w(TAG, "Initializer thread is still running", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// still alive, this should not happen.
|
||||
// Possible reasons:
|
||||
// if a thread is still alive, possible reasons:
|
||||
// slow device
|
||||
// the user is spamming start/pause buttons
|
||||
// start() method called quickly after pause()
|
||||
|
||||
for (Thread thread : threads) {
|
||||
if (!thread.isAlive() || thread == Thread.currentThread()) continue;
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
try {
|
||||
thread.join(millis);
|
||||
for (Thread thread : threads) {
|
||||
if (!thread.isAlive()) continue;
|
||||
if (DEBUG) {
|
||||
Log.w(TAG, "thread alive: " + thread.getName());
|
||||
}
|
||||
if (millis > 0) thread.join(millis);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Log.d(TAG, "timeout on join : " + thread.getName());
|
||||
throw new RuntimeException("A thread is still running:\n" + thread.getName());
|
||||
throw new RuntimeException("A download thread is still running", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue