/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.commons.util.RetryExceedLimitException;
import com.aliyun.odps.commons.util.RetryStrategy;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordPack;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.tunnel.Configuration;
import com.aliyun.odps.tunnel.StreamUploadSessionImpl;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TunnelTableSchema;
import com.aliyun.odps.tunnel.io.Checksum;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import com.aliyun.odps.tunnel.io.TunnelBufferedWriter;
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
import com.aliyun.odps.tunnel.io.TunnelRecordWriter;
import com.aliyun.odps.utils.StringUtils;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;

public class TableTunnel {
    private Configuration config;
    private Random random = new Random();

    public TableTunnel(Odps odps) {
        this.config = new Configuration(odps);
    }

    public Configuration getConfig() {
        return this.config;
    }

    public UploadSession createUploadSession(String projectName, String tableName) throws TunnelException {
        return new UploadSession(projectName, tableName, null, null, false);
    }

    public UploadSession createUploadSession(String projectName, String tableName, boolean overwrite) throws TunnelException {
        return new UploadSession(projectName, tableName, null, null, overwrite);
    }

    public UploadSession createUploadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException {
        return this.createUploadSession(projectName, tableName, partitionSpec, false);
    }

    public UploadSession createUploadSession(String projectName, String tableName, PartitionSpec partitionSpec, boolean overwrite) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new UploadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null, overwrite);
    }

    public UploadSession getUploadSession(String projectName, String tableName, String id, long shares, long shareId) throws TunnelException {
        return this.getUploadSession(projectName, tableName, null, id, shares, shareId);
    }

    public UploadSession getUploadSession(String projectName, String tableName, PartitionSpec partitionSpec, String id, long shares, long shareId) throws TunnelException {
        if (shares < 1L) {
            throw new IllegalArgumentException("Invalid arguments, shares must >= 1");
        }
        if (shareId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shareId must >= 0");
        }
        if (shares <= shareId) {
            throw new IllegalArgumentException("Invalid arguments, shares must > shareId");
        }
        UploadSession session = partitionSpec != null ? this.getUploadSession(projectName, tableName, partitionSpec, id) : this.getUploadSession(projectName, tableName, id);
        session.shares = shares;
        session.curBlockId = shareId;
        return session;
    }

    public UploadSession getUploadSession(String projectName, String tableName, String id) throws TunnelException {
        return new UploadSession(projectName, tableName, null, id);
    }

    public UploadSession getUploadSession(String projectName, String tableName, PartitionSpec partitionSpec, String id) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new UploadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), id);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName) throws TunnelException {
        return this.createDownloadSession(projectName, tableName, false);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, boolean async) throws TunnelException {
        return new DownloadSession(projectName, tableName, null, null, null, async);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null, null, false);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, boolean async) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null, null, async);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, long shardId) throws TunnelException {
        if (shardId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shard id required.");
        }
        return new DownloadSession(projectName, tableName, null, shardId, null, false);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, long shardId) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        if (shardId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shard id required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), shardId, null, false);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, String id) throws TunnelException {
        return new DownloadSession(projectName, tableName, null, null, id, false);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, long shardId, String id) throws TunnelException {
        return new DownloadSession(projectName, tableName, null, shardId, id, false);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, String id) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null, id, false);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, long shardId, String id) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        if (shardId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shard id required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), shardId, id, false);
    }

    private String getResource(String projectName, String tableName) {
        return this.config.getResource(projectName, tableName);
    }

    public void setEndpoint(String endpoint) {
        try {
            URI u = new URI(endpoint);
            this.config.setEndpoint(u);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid endpoint.");
        }
    }

    public StreamUploadSession createStreamUploadSession(String projectName, String tableName) throws TunnelException {
        return new StreamUploadSessionImpl(projectName, tableName, null, this.config);
    }

    public StreamUploadSession createStreamUploadSession(String projectName, String tableName, String partitionSpec) throws TunnelException {
        return this.createStreamUploadSession(projectName, tableName, new PartitionSpec(partitionSpec));
    }

    public StreamUploadSession createStreamUploadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException {
        return new StreamUploadSessionImpl(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), this.config);
    }

    static HashMap<String, String> getCommonHeader() {
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Content-Length", String.valueOf(0));
        headers.put("odps-tunnel-date-transform", "v1");
        return headers;
    }

    public class DownloadSession {
        private String id;
        private String projectName;
        private String tableName;
        private String partitionSpec;
        private Long shardId;
        private long count;
        private TableSchema schema = new TableSchema();
        private DownloadStatus status = DownloadStatus.UNKNOWN;
        private Configuration conf;
        private RestClient tunnelServiceClient;
        private boolean shouldTransform = false;

        DownloadSession(String projectName, String tableName, String partitionSpec, Long shardId, String downloadId, boolean async) throws TunnelException {
            this.conf = TableTunnel.this.config;
            this.projectName = projectName;
            this.tableName = tableName;
            this.partitionSpec = partitionSpec;
            this.shardId = shardId;
            this.id = downloadId;
            this.tunnelServiceClient = this.conf.newRestClient(projectName);
            if (this.id == null) {
                this.initiate(async);
            } else {
                this.reload();
            }
        }

        public TunnelRecordReader openRecordReader(long start, long count) throws TunnelException, IOException {
            return this.openRecordReader(start, count, false);
        }

        public TunnelRecordReader openRecordReader(long start, long count, boolean compress) throws TunnelException, IOException {
            return this.openRecordReader(start, count, compress, null);
        }

        public TunnelRecordReader openRecordReader(long start, long count, CompressOption compress) throws TunnelException, IOException {
            return this.openRecordReader(start, count, compress, null);
        }

        public TunnelRecordReader openRecordReader(long start, long count, boolean compress, List<Column> columns) throws TunnelException, IOException {
            CompressOption option = compress ? new CompressOption() : new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
            return this.openRecordReader(start, count, option, columns);
        }

        public TunnelRecordReader openRecordReader(long start, long count, CompressOption compress, List<Column> columns) throws TunnelException, IOException {
            TunnelRecordReader reader = new TunnelRecordReader(start, count, columns, compress, this.tunnelServiceClient, this);
            reader.setTransform(this.shouldTransform);
            return reader;
        }

        private void initiate(boolean async) throws TunnelException {
            HashMap<String, String> params = new HashMap<String, String>();
            HashMap<String, String> headers = TableTunnel.getCommonHeader();
            params.put("downloads", null);
            if (async) {
                params.put("asyncmode", "true");
            }
            if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                params.put("partition", this.partitionSpec);
            }
            if (this.shardId != null) {
                params.put("shard", String.valueOf(this.shardId));
            }
            Connection conn = null;
            try {
                conn = this.tunnelServiceClient.connect(this.getResource(), "POST", params, headers);
                Response resp = conn.getResponse();
                if (resp.isOK()) {
                    this.loadFromJson(conn.getInputStream());
                    this.shouldTransform = StringUtils.equals((String)resp.getHeader("odps-tunnel-date-transform"), (String)"true");
                } else {
                    throw new TunnelException(resp.getHeader("x-odps-request-id"), conn.getInputStream(), resp.getStatus());
                }
                while (this.status == DownloadStatus.INITIATING) {
                    Thread.sleep(TableTunnel.this.random.nextInt(30000) + 5000);
                    this.reload();
                }
            }
            catch (IOException e) {
                throw new TunnelException("Failed to create download session with tunnel endpoint " + this.tunnelServiceClient.getEndpoint(), e);
            }
            catch (TunnelException e) {
                throw e;
            }
            catch (OdpsException e) {
                throw new TunnelException(e.getMessage(), e);
            }
            catch (InterruptedException e) {
                throw new TunnelException(e.getMessage(), e);
            }
            finally {
                if (conn != null) {
                    try {
                        conn.disconnect();
                    }
                    catch (IOException iOException) {}
                }
            }
        }

        private void reload() throws TunnelException {
            block15: {
                HashMap<String, String> params = new HashMap<String, String>();
                HashMap<String, String> headers = TableTunnel.getCommonHeader();
                params.put("downloadid", this.id);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                if (this.shardId != null) {
                    params.put("shard", String.valueOf(this.shardId));
                }
                Connection conn = null;
                try {
                    conn = this.tunnelServiceClient.connect(this.getResource(), "GET", params, headers);
                    Response resp = conn.getResponse();
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        this.shouldTransform = StringUtils.equals((String)resp.getHeader("odps-tunnel-date-transform"), (String)"true");
                        break block15;
                    }
                    TunnelException e = new TunnelException(conn.getInputStream());
                    e.setRequestId(resp.getHeader("x-odps-request-id"));
                    throw e;
                }
                catch (IOException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException iOException) {}
                    }
                }
            }
        }

        public TableSchema getSchema() {
            return this.schema;
        }

        public long getRecordCount() {
            return this.count;
        }

        public String getId() {
            return this.id;
        }

        public DownloadStatus getStatus() throws TunnelException, IOException {
            this.reload();
            return this.status;
        }

        public String getPartitionSpec() {
            return this.partitionSpec;
        }

        public String getProjectName() {
            return this.projectName;
        }

        public String getTableName() {
            return this.tableName;
        }

        private String getResource() {
            return this.conf.getResource(this.projectName, this.tableName);
        }

        private void loadFromJson(InputStream is) throws TunnelException {
            try {
                String json = IOUtils.readStreamAsString(is);
                JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
                if (tree.has("DownloadID")) {
                    this.id = tree.get("DownloadID").getAsString();
                }
                if (tree.has("Status")) {
                    String downloadStatus = tree.get("Status").getAsString().toUpperCase();
                    this.status = DownloadStatus.valueOf(downloadStatus);
                }
                if (tree.has("RecordCount")) {
                    this.count = tree.get("RecordCount").getAsLong();
                }
                if (tree.has("Schema")) {
                    JsonObject tunnelTableSchema = tree.get("Schema").getAsJsonObject();
                    this.schema = new TunnelTableSchema(tunnelTableSchema);
                }
            }
            catch (Exception e) {
                throw new TunnelException("Invalid json content.", e);
            }
        }
    }

    public static enum DownloadStatus {
        UNKNOWN,
        NORMAL,
        CLOSED,
        EXPIRED,
        INITIATING;

    }

    public class UploadSession {
        private String id;
        private TableSchema schema = new TableSchema();
        private String projectName;
        private String tableName;
        private String partitionSpec;
        private List<Long> blocks = new ArrayList<Long>();
        private UploadStatus status = UploadStatus.UNKNOWN;
        private Configuration conf;
        private RestClient tunnelServiceClient;
        private final Long totalBLocks = 20000L;
        private Long shares = 1L;
        private Long curBlockId = 0L;
        private static final int RETRY_SLEEP_SECONDS = 5;
        private boolean shouldTransform = false;
        private boolean overwrite = false;

        UploadSession(String projectName, String tableName, String partitionSpec, String uploadId) throws TunnelException {
            this(projectName, tableName, partitionSpec, uploadId, false);
        }

        UploadSession(String projectName, String tableName, String partitionSpec, String uploadId, boolean overwrite) throws TunnelException {
            this.conf = TableTunnel.this.config;
            this.projectName = projectName;
            this.tableName = tableName;
            this.partitionSpec = partitionSpec;
            this.id = uploadId;
            this.overwrite = overwrite;
            this.tunnelServiceClient = this.conf.newRestClient(projectName);
            if (this.id == null) {
                this.initiate();
            } else {
                this.reload();
            }
        }

        private void initiate() throws TunnelException {
            block15: {
                HashMap<String, String> params = new HashMap<String, String>();
                params.put("uploads", null);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                if (this.overwrite) {
                    params.put("overwrite", "true");
                }
                HashMap<String, String> headers = TableTunnel.getCommonHeader();
                Connection conn = null;
                try {
                    conn = this.tunnelServiceClient.connect(this.getResource(), "POST", params, headers);
                    Response resp = conn.getResponse();
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        this.shouldTransform = StringUtils.equals((String)resp.getHeader("odps-tunnel-date-transform"), (String)"true");
                        break block15;
                    }
                    throw new TunnelException(resp.getHeader("x-odps-request-id"), conn.getInputStream(), resp.getStatus());
                }
                catch (IOException e) {
                    throw new TunnelException("Failed to create upload session with tunnel endpoint " + this.tunnelServiceClient.getEndpoint(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException iOException) {}
                    }
                }
            }
        }

        public boolean isShouldTransform() {
            return this.shouldTransform;
        }

        public synchronized Long getAvailBlockId() {
            if (this.curBlockId >= this.totalBLocks) {
                throw new RuntimeException("No more available blockId, already " + this.curBlockId);
            }
            Long old = this.curBlockId;
            this.curBlockId = this.curBlockId + this.shares;
            return old;
        }

        public void commit() throws TunnelException, IOException {
            this.completeUpload();
        }

        public void writeBlock(long blockId, RecordPack pack) throws IOException {
            Connection conn = null;
            try {
                if (pack instanceof ProtobufRecordPack) {
                    ProtobufRecordPack protoPack = (ProtobufRecordPack)pack;
                    conn = this.getConnection(blockId, protoPack.getCompressOption());
                    this.sendBlock(protoPack, conn);
                } else {
                    Record record;
                    RecordWriter writer = this.openRecordWriter(blockId);
                    RecordReader reader = pack.getRecordReader();
                    while ((record = reader.read()) != null) {
                        writer.write(record);
                    }
                    writer.close();
                }
            }
            catch (OdpsException e) {
                throw new IOException(e.getMessage(), e);
            }
            finally {
                if (null != conn) {
                    conn.disconnect();
                }
            }
        }

        private void sendBlock(ProtobufRecordPack pack, Connection conn) throws IOException {
            if (null == conn) {
                throw new IOException("Invalid connection");
            }
            pack.checkTransConsistency(this.shouldTransform);
            pack.complete();
            ByteArrayOutputStream baos = pack.getProtobufStream();
            baos.writeTo(conn.getOutputStream());
            conn.getOutputStream().close();
            baos.close();
            Response response = conn.getResponse();
            if (!response.isOK()) {
                TunnelException exception = new TunnelException(response.getHeader("x-odps-request-id"), conn.getInputStream(), response.getStatus());
                throw new IOException(exception.getMessage(), exception);
            }
        }

        public RecordWriter openRecordWriter(long blockId) throws TunnelException, IOException {
            return this.openRecordWriter(blockId, false);
        }

        public RecordWriter openRecordWriter(long blockId, boolean compress) throws TunnelException, IOException {
            CompressOption option = compress ? new CompressOption() : new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
            return this.openRecordWriter(blockId, option);
        }

        public RecordWriter openRecordWriter(long blockId, CompressOption compress) throws TunnelException, IOException {
            TunnelRecordWriter writer = null;
            Connection conn = null;
            try {
                conn = this.getConnection(blockId, compress);
                writer = new TunnelRecordWriter(this.schema, conn, compress);
                writer.setTransform(this.shouldTransform);
            }
            catch (IOException e) {
                if (conn != null) {
                    conn.disconnect();
                }
                throw new TunnelException(e.getMessage(), e.getCause());
            }
            catch (TunnelException e) {
                throw e;
            }
            catch (OdpsException e) {
                throw new TunnelException(e.getMessage(), e);
            }
            return writer;
        }

        public RecordWriter openBufferedWriter() throws TunnelException {
            return this.openBufferedWriter(false);
        }

        public RecordWriter openBufferedWriter(boolean compress) throws TunnelException {
            CompressOption compressOption = compress ? this.conf.getCompressOption() : new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
            return this.openBufferedWriter(compressOption);
        }

        public RecordWriter openBufferedWriter(CompressOption compressOption) throws TunnelException {
            try {
                return new TunnelBufferedWriter(this, compressOption);
            }
            catch (IOException e) {
                throw new TunnelException(e.getMessage(), e.getCause());
            }
        }

        private Connection getConnection(long blockId, CompressOption compress) throws OdpsException, IOException {
            HashMap<String, String> params = new HashMap<String, String>();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("Transfer-Encoding", "chunked");
            headers.put("Content-Type", "application/octet-stream");
            headers.put("x-odps-tunnel-version", String.valueOf(4));
            switch (compress.algorithm) {
                case ODPS_RAW: {
                    break;
                }
                case ODPS_ZLIB: {
                    headers.put("Content-Encoding", "deflate");
                    break;
                }
                case ODPS_SNAPPY: {
                    headers.put("Content-Encoding", "x-snappy-framed");
                    break;
                }
                default: {
                    throw new TunnelException("invalid compression option.");
                }
            }
            params.put("uploadid", this.id);
            params.put("blockid", Long.toString(blockId));
            if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                params.put("partition", this.partitionSpec);
            }
            return this.tunnelServiceClient.connect(this.getResource(), "PUT", params, headers);
        }

        private void reload() throws TunnelException {
            block14: {
                HashMap<String, String> params = new HashMap<String, String>();
                HashMap<String, String> headers = TableTunnel.getCommonHeader();
                params.put("uploadid", this.id);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                Connection conn = null;
                try {
                    conn = this.tunnelServiceClient.connect(this.getResource(), "GET", params, headers);
                    Response resp = conn.getResponse();
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        this.shouldTransform = StringUtils.equals((String)resp.getHeader("odps-tunnel-date-transform"), (String)"true");
                        break block14;
                    }
                    TunnelException e = new TunnelException(conn.getInputStream());
                    throw e;
                }
                catch (IOException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException iOException) {}
                    }
                }
            }
        }

        public void commit(Long[] blocks) throws TunnelException, IOException {
            if (blocks == null) {
                throw new IllegalArgumentException("Invalid argument: blocks.");
            }
            HashMap<Long, Boolean> clientBlockMap = new HashMap<Long, Boolean>();
            for (Long blockId : blocks) {
                clientBlockMap.put(blockId, true);
            }
            Long[] serverBlocks = this.getBlockList();
            HashMap<Long, Boolean> serverBlockMap = new HashMap<Long, Boolean>();
            for (Long blockId : serverBlocks) {
                serverBlockMap.put(blockId, true);
            }
            if (serverBlockMap.size() != clientBlockMap.size()) {
                throw new TunnelException("Blocks not match, server: " + serverBlockMap.size() + ", tunnelServiceClient: " + clientBlockMap.size());
            }
            for (Long blockId : blocks) {
                if (serverBlockMap.containsKey(blockId)) continue;
                throw new TunnelException("Block not exsits on server, block id is " + blockId);
            }
            this.completeUpload();
        }

        private void completeUpload() throws TunnelException, IOException {
            HashMap<String, String> params = new HashMap<String, String>();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("Content-Length", String.valueOf(0));
            params.put("uploadid", this.id);
            if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                params.put("partition", this.partitionSpec);
            }
            RetryStrategy retryStrategy = new RetryStrategy(this.tunnelServiceClient.getRetryTimes(), 5);
            while (true) {
                Connection conn = null;
                try {
                    conn = this.tunnelServiceClient.connect(this.getResource(), "POST", params, headers);
                    Response resp = conn.getResponse();
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        break;
                    }
                    try {
                        throw new TunnelException(resp.getHeader("x-odps-request-id"), conn.getInputStream(), resp.getStatus());
                    }
                    catch (TunnelException e) {
                        try {
                            retryStrategy.onFailure(e);
                            continue;
                        }
                        catch (RetryExceedLimitException ignore) {
                            throw e;
                        }
                        catch (InterruptedException ignore) {
                            throw e;
                        }
                    }
                    catch (OdpsException e) {
                        throw new TunnelException(e.getMessage(), e);
                    }
                }
                finally {
                    if (conn == null) continue;
                    conn.disconnect();
                    continue;
                }
                break;
            }
        }

        public String getId() {
            return this.id;
        }

        public TableSchema getSchema() {
            return this.schema;
        }

        public UploadStatus getStatus() throws TunnelException, IOException {
            this.reload();
            return this.status;
        }

        public Record newRecord() {
            return new ArrayRecord(this.getSchema().getColumns().toArray(new Column[0]));
        }

        public RecordPack newRecordPack() throws IOException {
            return this.newRecordPack(null);
        }

        public RecordPack newRecordPack(CompressOption option) throws IOException {
            return this.newRecordPack(0, option);
        }

        public RecordPack newRecordPack(int capacity, CompressOption option) throws IOException {
            ProtobufRecordPack pack = new ProtobufRecordPack(this.schema, new Checksum(), capacity, option);
            pack.setTransform(this.shouldTransform);
            return pack;
        }

        public Long[] getBlockList() throws TunnelException, IOException {
            this.reload();
            return this.blocks.toArray(new Long[0]);
        }

        private String getResource() {
            return this.conf.getResource(this.projectName, this.tableName);
        }

        private void loadFromJson(InputStream is) throws TunnelException {
            try {
                String json = IOUtils.readStreamAsString(is);
                JsonObject tree = new JsonParser().parse(json).getAsJsonObject();
                if (tree.has("UploadID")) {
                    this.id = tree.get("UploadID").getAsString();
                }
                if (tree.has("Status")) {
                    String uploadStatus = tree.get("Status").getAsString().toUpperCase();
                    this.status = UploadStatus.valueOf(uploadStatus);
                }
                this.blocks.clear();
                if (tree.has("UploadedBlockList")) {
                    JsonArray blockList = tree.get("UploadedBlockList").getAsJsonArray();
                    for (int i = 0; i < blockList.size(); ++i) {
                        if (!blockList.get(i).getAsJsonObject().has("BlockID")) continue;
                        this.blocks.add(blockList.get(i).getAsJsonObject().get("BlockID").getAsLong());
                    }
                }
                if (tree.has("Schema")) {
                    JsonObject tunnelTableSchema = tree.get("Schema").getAsJsonObject();
                    this.schema = new TunnelTableSchema(tunnelTableSchema);
                }
            }
            catch (Exception e) {
                throw new TunnelException("Invalid json content.", e);
            }
        }
    }

    public static enum UploadStatus {
        UNKNOWN,
        NORMAL,
        CLOSING,
        CLOSED,
        CANCELED,
        EXPIRED,
        CRITICAL,
        COMMITTING;

    }

    public static interface StreamUploadSession {
        public void setP2pMode(boolean var1);

        public String getId();

        public TableSchema getSchema();

        public StreamRecordPack newRecordPack() throws IOException;

        public StreamRecordPack newRecordPack(CompressOption var1) throws IOException, TunnelException;

        public Record newRecord();
    }

    public static interface StreamRecordPack {
        public void append(Record var1) throws IOException;

        public long getRecordCount();

        public long getDataSize();

        public String flush() throws IOException;
    }
}

