package us.shandian.giga.get; import android.os.Handler; import android.os.Message; import android.util.Log; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import javax.net.ssl.SSLException; import us.shandian.giga.postprocessing.Postprocessing; import us.shandian.giga.service.DownloadManagerService; import us.shandian.giga.util.Utility; import static org.schabi.newpipe.BuildConfig.DEBUG; public class DownloadMission extends Mission { private static final long serialVersionUID = 3L;// last bump: 8 november 2018 static final int BUFFER_SIZE = 64 * 1024; final static int BLOCK_SIZE = 512 * 1024; private static final String TAG = "DownloadMission"; public static final int ERROR_NOTHING = -1; public static final int ERROR_PATH_CREATION = 1000; public static final int ERROR_FILE_CREATION = 1001; public static final int ERROR_UNKNOWN_EXCEPTION = 1002; public static final int ERROR_PERMISSION_DENIED = 1003; public static final int ERROR_SSL_EXCEPTION = 1004; public static final int ERROR_UNKNOWN_HOST = 1005; public static final int ERROR_CONNECT_HOST = 1006; public static final int ERROR_POSTPROCESSING = 1007; public static final int ERROR_HTTP_NO_CONTENT = 204; public static final int ERROR_HTTP_UNSUPPORTED_RANGE = 206; /** * The urls of the file to download */ public String[] urls; /** * Number of blocks the size of {@link DownloadMission#BLOCK_SIZE} */ long blocks = -1; /** * Number of bytes downloaded */ public long done; /** * Indicates a file generated dynamically on the web server */ public boolean unknownLength; /** * offset in the file where the data should be written */ public long[] offsets; /** * The post-processing algorithm arguments */ public String[] postprocessingArgs; /** * The post-processing algorithm name */ public String postprocessingName; /** * Indicates if the post-processing state: * 0: ready * 1: running * 2: completed */ public int postprocessingState; /** * Indicate if the post-processing algorithm works on the same file */ public boolean postprocessingThis; /** * The current resource to download {@code urls[current]} */ public int current; /** * Metadata where the mission state is saved */ public File metadata; /** * maximum attempts */ public int maxRetry; /** * Approximated final length, this represent the sum of all resources sizes */ public long nearLength; public int threadCount = 3; boolean fallback; private int finishCount; public transient boolean running; public transient boolean enqueued = true; public int errCode = ERROR_NOTHING; public transient Exception errObject = null; public transient boolean recovered; public transient Handler mHandler; private transient boolean mWritingToFile; @SuppressWarnings("UseSparseArrays")// LongSparseArray is not serializable final HashMap blockState = new HashMap<>(); final List threadBlockPositions = new ArrayList<>(); final List threadBytePositions = new ArrayList<>(); private transient boolean deleted; int currentThreadCount; private transient Thread[] threads = new Thread[0]; private transient Thread init = null; protected DownloadMission() { } public DownloadMission(String url, String name, String location, char kind) { this(new String[]{url}, name, location, kind, null, null); } public DownloadMission(String[] urls, String name, String location, char kind, String postprocessingName, String[] postprocessingArgs) { if (name == null) throw new NullPointerException("name is null"); if (name.isEmpty()) throw new IllegalArgumentException("name is empty"); if (urls == null) throw new NullPointerException("urls is null"); if (urls.length < 1) throw new IllegalArgumentException("urls is empty"); if (location == null) throw new NullPointerException("location is null"); if (location.isEmpty()) throw new IllegalArgumentException("location is empty"); this.urls = urls; this.name = name; this.location = location; this.kind = kind; this.offsets = new long[urls.length]; if (postprocessingName != null) { Postprocessing algorithm = Postprocessing.getAlgorithm(postprocessingName, null); this.postprocessingThis = algorithm.worksOnSameFile; this.offsets[0] = algorithm.recommendedReserve; this.postprocessingName = postprocessingName; this.postprocessingArgs = postprocessingArgs; } else { if (DEBUG && urls.length > 1) { Log.w(TAG, "mission created with multiple urls ¿missing post-processing algorithm?"); } } } private void checkBlock(long block) { if (block < 0 || block >= blocks) { throw new IllegalArgumentException("illegal block identifier"); } } /** * Check if a block is reserved * * @param block the block identifier * @return true if the block is reserved and false if otherwise */ boolean isBlockPreserved(long block) { checkBlock(block); return blockState.containsKey(block) ? blockState.get(block) : false; } void preserveBlock(long block) { checkBlock(block); synchronized (blockState) { blockState.put(block, true); } } /** * Set the block of the file * * @param threadId the identifier of the thread * @param position the block of the thread */ void setBlockPosition(int threadId, long position) { threadBlockPositions.set(threadId, position); } /** * Get the block of a file * * @param threadId the identifier of the thread * @return the block for the thread */ long getBlockPosition(int threadId) { return threadBlockPositions.get(threadId); } /** * Save the position of the desired thread * * @param threadId the identifier of the thread * @param position the relative position in bytes or zero */ void setThreadBytePosition(int threadId, long position) { threadBytePositions.set(threadId, position); } /** * Get position inside of the thread, where thread will be resumed * * @param threadId the identifier of the thread * @return the relative position in bytes or zero */ long getThreadBytePosition(int threadId) { return threadBytePositions.get(threadId); } /** * Open connection * * @param threadId id of the calling thread, used only for debug * @param rangeStart range start * @param rangeEnd range end * @return a {@link java.net.URLConnection URLConnection} linking to the URL. * @throws IOException if an I/O exception occurs. */ HttpURLConnection openConnection(int threadId, long rangeStart, long rangeEnd) throws IOException { URL url = new URL(urls[current]); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(true); if (rangeStart >= 0) { String req = "bytes=" + rangeStart + "-"; if (rangeEnd > 0) req += rangeEnd; conn.setRequestProperty("Range", req); if (DEBUG) { Log.d(TAG, threadId + ":" + conn.getRequestProperty("Range")); } } return conn; } /** * @param threadId id of the calling thread * @param conn Opens and establish the communication * @throws IOException if an error occurred connecting to the server. * @throws HttpError if the HTTP Status-Code is not satisfiable */ void establishConnection(int threadId, HttpURLConnection conn) throws IOException, HttpError { conn.connect(); int statusCode = conn.getResponseCode(); if (DEBUG) { Log.d(TAG, threadId + ":Content-Length=" + conn.getContentLength() + " Code:" + statusCode); } switch (statusCode) { case 204: case 205: case 207: throw new HttpError(conn.getResponseCode()); case 416: return;// let the download thread handle this error default: if (statusCode < 200 || statusCode > 299) { throw new HttpError(statusCode); } } } private void notify(int what) { Message m = new Message(); m.what = what; m.obj = this; mHandler.sendMessage(m); } synchronized void notifyProgress(long deltaLen) { if (!running) return; if (recovered) { recovered = false; } if (unknownLength) { length += deltaLen;// Update length before proceeding } done += deltaLen; if (done > length) { done = length; } if (done != length && !deleted && !mWritingToFile) { mWritingToFile = true; runAsync(-2, this::writeThisToFile); } notify(DownloadManagerService.MESSAGE_PROGRESS); } synchronized void notifyError(Exception err) { Log.e(TAG, "notifyError()", err); if (err instanceof FileNotFoundException) { notifyError(ERROR_FILE_CREATION, null); } else if (err instanceof SSLException) { notifyError(ERROR_SSL_EXCEPTION, null); } else if (err instanceof HttpError) { notifyError(((HttpError) err).statusCode, null); } else if (err instanceof ConnectException) { notifyError(ERROR_CONNECT_HOST, null); } else if (err instanceof UnknownHostException) { notifyError(ERROR_UNKNOWN_HOST, null); } else { notifyError(ERROR_UNKNOWN_EXCEPTION, err); } } synchronized void notifyError(int code, Exception err) { Log.e(TAG, "notifyError() code = " + code, err); errCode = code; errObject = err; pause(); notify(DownloadManagerService.MESSAGE_ERROR); } synchronized void notifyFinished() { if (errCode > ERROR_NOTHING) return; finishCount++; if (finishCount == currentThreadCount) { if (errCode != ERROR_NOTHING) return; if (DEBUG) { Log.d(TAG, "onFinish" + (current + 1) + "/" + urls.length); } if ((current + 1) < urls.length) { // prepare next sub-mission long current_offset = offsets[current++]; offsets[current] = current_offset + length; initializer(); return; } current++; unknownLength = false; if (!doPostprocessing()) return; running = false; deleteThisFromFile(); notify(DownloadManagerService.MESSAGE_FINISHED); } } private void notifyPostProcessing(int state) { if (DEBUG) { String action; switch (state) { case 1: action = "Running"; break; case 2: action = "Completed"; break; default: action = "Failed"; } Log.d(TAG, action + " postprocessing on " + location + File.separator + name); } synchronized (blockState) { // don't return without fully write the current state postprocessingState = state; Utility.writeToFile(metadata, DownloadMission.this); } } /** * Start downloading with multiple threads. */ public void start() { if (running || isFinished()) return; // ensure that the previous state is completely paused. joinForThread(init); if (threads != null) for (Thread thread : threads) joinForThread(thread); enqueued = false; running = true; errCode = ERROR_NOTHING; if (current >= urls.length && postprocessingName != null) { runAsync(1, () -> { if (doPostprocessing()) { running = false; deleteThisFromFile(); notify(DownloadManagerService.MESSAGE_FINISHED); } }); return; } if (blocks < 0) { initializer(); return; } init = null; if (threads == null || threads.length < 1) { threads = new Thread[currentThreadCount]; } if (fallback) { if (unknownLength) { done = 0; length = 0; } threads[0] = runAsync(1, new DownloadRunnableFallback(this)); } else { for (int i = 0; i < currentThreadCount; i++) { threads[i] = runAsync(i + 1, new DownloadRunnable(this, i)); } } } /** * Pause the mission, does not affect the blocks that are being downloaded. */ public synchronized void pause() { if (!running) return; if (isPsRunning()) { if (DEBUG) { Log.w(TAG, "pause during post-processing is not applicable."); } return; } running = false; recovered = true; enqueued = false; if (init != null && Thread.currentThread() != init && init.isAlive()) { init.interrupt(); synchronized (blockState) { resetState(); } return; } if (DEBUG && blocks == 0) { Log.w(TAG, "pausing a download that can not be resumed (range requests not allowed by the server)."); } if (threads == null || Thread.currentThread().isInterrupted()) { writeThisToFile(); return; } // wait for all threads are suspended before save the state runAsync(-1, () -> { try { for (Thread thread : threads) { if (thread.isAlive()) { thread.interrupt(); thread.join(5000); } } } catch (Exception e) { // nothing to do } finally { writeThisToFile(); } }); } /** * Removes the file and the meta file */ @Override public boolean delete() { deleted = true; boolean res = deleteThisFromFile(); if (!super.delete()) res = false; return res; } void resetState() { done = 0; blocks = -1; errCode = ERROR_NOTHING; fallback = false; unknownLength = false; finishCount = 0; threadBlockPositions.clear(); threadBytePositions.clear(); blockState.clear(); threads = new Thread[0]; Utility.writeToFile(metadata, DownloadMission.this); } private void initializer() { init = runAsync(DownloadInitializer.mId, new DownloadInitializer(this)); } /** * Write this {@link DownloadMission} to the meta file asynchronously * if no thread is already running. */ private void writeThisToFile() { synchronized (blockState) { if (deleted) return; Utility.writeToFile(metadata, DownloadMission.this); } mWritingToFile = false; } /** * Indicates if the download if fully finished * * @return true, otherwise, false */ public boolean isFinished() { return current >= urls.length && (postprocessingName == null || postprocessingState == 2); } /** * Indicates if the download file is corrupt due a failed post-processing * * @return {@code true} if this mission is unrecoverable */ public boolean isPsFailed() { return postprocessingName != null && errCode == DownloadMission.ERROR_POSTPROCESSING && postprocessingThis; } /** * Indicates if a post-processing algorithm is running * * @return true, otherwise, false */ public boolean isPsRunning() { return postprocessingName != null && postprocessingState == 1; } public long getLength() { long calculated; if (postprocessingState == 1) { calculated = length; } else { calculated = offsets[current < offsets.length ? current : (offsets.length - 1)] + length; } calculated -= offsets[0];// don't count reserved space return calculated > nearLength ? calculated : nearLength; } private boolean doPostprocessing() { if (postprocessingName == null || postprocessingState == 2) return true; notifyPostProcessing(1); notifyProgress(0); Thread.currentThread().setName("[" + TAG + "] post-processing = " + postprocessingName + " filename = " + name); Exception exception = null; try { Postprocessing .getAlgorithm(postprocessingName, this) .run(); } catch (Exception err) { StringBuilder args = new StringBuilder(" "); if (postprocessingArgs != null) { for (String arg : postprocessingArgs) { args.append(", "); args.append(arg); } args.delete(0, 1); } Log.e(TAG, String.format("Post-processing failed. algorithm = %s args = [%s]", postprocessingName, args), err); if (errCode == ERROR_NOTHING) errCode = ERROR_POSTPROCESSING; exception = err; } finally { notifyPostProcessing(errCode == ERROR_NOTHING ? 2 : 0); } if (errCode != ERROR_NOTHING) { if (exception == null) exception = errObject; notifyError(ERROR_POSTPROCESSING, exception); return false; } return true; } private boolean deleteThisFromFile() { synchronized (blockState) { return metadata.delete(); } } /** * run a new thread * * @param id id of new thread (used for debugging only) * @param who the Runnable whose {@code run} method is invoked. */ private void runAsync(int id, Runnable who) { runAsync(id, new Thread(who)); } /** * run a new thread * * @param id id of new thread (used for debugging only) * @param who the Thread whose {@code run} method is invoked when this thread is started * @return the passed thread */ private Thread runAsync(int id, Thread who) { // known thread ids: // -2: state saving by notifyProgress() method // -1: wait for saving the state by pause() method // 0: initializer // >=1: any download thread if (DEBUG) { who.setName(String.format("%s[%s] %s", TAG, id, name)); } who.start(); return who; } private void joinForThread(Thread thread) { if (thread == null || !thread.isAlive()) return; if (thread == Thread.currentThread()) return; if (DEBUG) { Log.w(TAG, "a thread is !still alive!: " + thread.getName()); } // still alive, this should not happen. // Possible reasons: // slow device // the user is spamming start/pause buttons // start() method called quickly after pause() try { thread.join(10000); } catch (InterruptedException e) { Log.d(TAG, "timeout on join : " + thread.getName()); throw new RuntimeException("A thread is still running:\n" + thread.getName()); } } static class HttpError extends Exception { int statusCode; HttpError(int statusCode) { this.statusCode = statusCode; } @Override public String getMessage() { return "HTTP " + String.valueOf(statusCode); } } }