Coded data transfer in java

RenewableStream

--------- RenewableStreamExample.java ----------

import java.util.*;
import java.util.concurrent.*;
import java.util.zip.*;
import java.io.*;
import java.net.*;
import java.security.*;
import java.security.spec.*;
import javax.crypto.*;
import javax.crypto.spec.*;
import java.nio.charset.*;

public class RenewableStreamExample {

  boolean debug = true;
  String host = "127.0.0.1";
  int port = 502;
  int serverMaxConnections = 3;
  String asymmetricKeyAlgorithm = "RSA/ECB/PKCS1Padding";
  String secretKeyAlgorithm = "AES/ECB/PKCS5Padding";
  String hashAlgorithm = "SHA-1";
  volatile Thread serverThread;
  volatile ExecutorService executorService;
  volatile Map<Runnable, Socket> map;
  volatile ServerSocket serverSocket;

  public static void main(String[] args) throws Exception { new RenewableStreamExample(); }

  RenewableStreamExample() throws Exception {
    startServer();
    for (int i = 0; i < 1; i++) startClient(i);
    Thread.sleep(5_000);
    while (map.size() > 0) Thread.sleep(1_000);
    stopServer();
  }

  void startServer() {
    serverThread = new Thread(() -> {
      try {
        if (debug) System.out.println("Start server");
        KeyPairGenerator kpg = KeyPairGenerator.getInstance(asymmetricKeyAlgorithm.split("/")[0]);
        kpg.initialize(512);
        KeyPair keyPair = kpg.genKeyPair();
        Key publicKey = keyPair.getPublic();
        Key privateKey = keyPair.getPrivate();
        map = Collections.synchronizedMap(new HashMap<>());
        executorService = Executors.newCachedThreadPool((runnable) -> {
          Thread thread = new Thread(runnable);
          //thread.setDaemon(true);
          return thread;
        });
        serverSocket = new ServerSocket(port);
        try {
          while (!Thread.currentThread().isInterrupted()) {
            Socket socket = serverSocket.accept();
            executorService.submit(() -> {
              synchronized (serverThread) {
                if (map.size() >= serverMaxConnections) {
                  try { socket.close(); } catch (Exception e) { }
                  if (debug) System.out.println("Server connection " + map.size() + " reject: " + socket.toString());
                  return;
                }
                map.put(Thread.currentThread(), socket);
              }
              if (debug) System.out.println("Server connection " + map.size() + " accept: " + socket.toString());
              OutputStream cos, dos, os = null;
              InputStream cis, iis, is = null;
              try {
                socket.setSoTimeout(60_000);
                is = socket.getInputStream();
                os = socket.getOutputStream();
                RenwInputStream ris = new RenwInputStream(is);
                RenwOutputStream ros = new RenwOutputStream(os);
                ros.write(publicKey.getEncoded());
                ros.close();
                Cipher cipher = Cipher.getInstance(asymmetricKeyAlgorithm);
                cipher.init(Cipher.DECRYPT_MODE, privateKey);
                byte[] secretKeyBytes = cipher.doFinal(readAllBytes(ris));
                ris.renew();
                cipher = null;
                SecretKey secretKey = new SecretKeySpec(secretKeyBytes, secretKeyAlgorithm.split("/")[0]);
                secretKeyBytes = null;
                Cipher encryptCipher = Cipher.getInstance(secretKeyAlgorithm);
                encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
                Cipher decryptCipher = Cipher.getInstance(secretKeyAlgorithm);
                decryptCipher.init(Cipher.DECRYPT_MODE, secretKey);
                String username = new String(decryptCipher.doFinal(readAllBytes(ris)), StandardCharsets.UTF_8);
                //System.out.println(username);
                ris.renew();
                boolean okPass = Arrays.equals(decryptCipher.doFinal(readAllBytes(ris)), MessageDigest.getInstance(hashAlgorithm).digest("password".getBytes(StandardCharsets.UTF_8)));
                ris.renew();
                boolean compressing = (decryptCipher.doFinal(readAllBytes(ris))[0] != 0);
                ris.renew();
                ros.write(encryptCipher.doFinal(new byte[] {(byte)(okPass ? 1 : 0), (byte)(compressing ? 1 : 0)}));
                ros.close();
                if (!okPass) throw new IOException("password is incorrect");
                Deflater deflater = null;
                Inflater inflater = null;
                if (compressing) {
                  deflater = new Deflater();
                  inflater = new Inflater();
                }
                byte[] buffer = new byte[1024];
                int len;
                while (!Thread.currentThread().isInterrupted()) {
                  cis = new CipherInputStream(ris, decryptCipher);
                  cos = new CipherOutputStream(ros, encryptCipher);
                  if (compressing) {
                    inflater.reset();
                    deflater.reset();
                    iis = new InflaterInputStream(cis, inflater);
                    dos = new DeflaterOutputStream(cos, deflater);
                  } else {
                    iis = cis;
                    dos = cos;
                  }
                  while ((len = iis.read(buffer)) >= 0) {
                    dos.write(buffer, 0, len);
                  }
                  ris.renew();
                  iis.close();
                  dos.close();
                }
              } catch (Exception e) {
                if (debug) System.out.println("Server connection " + e);
              }
              if (is != null) try { is.close(); } catch (Exception e) { }
              if (os != null) try { os.close(); } catch (Exception e) { }
              if (socket != null && !socket.isClosed()) try { socket.close(); } catch (Exception e) { }
              map.remove(Thread.currentThread());
              if (debug) System.out.println("Server connection close: " + socket.toString());
            });
          }
        } catch (Exception e) {
          if (debug) System.out.println("Server " + e);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
    //serverThread.setDaemon(true);
    serverThread.start();
  }

  public void stopServer() {
    if (debug) System.out.println("Stop server");
    int timeout = 10_000;
    serverThread.interrupt();
    executorService.shutdownNow();
    for (int c = 0, step = 500; c < timeout && map.size() > 0; c += step) {
      try { Thread.sleep(step); } catch (Exception e) { }
    }
    map.forEach((thread, socket) -> {
      try { socket.getInputStream().close(); } catch (Exception e) { }
      try { socket.getOutputStream().close(); } catch (Exception e) { }
      try { socket.close(); } catch (Exception e) { }
    });
    if (serverSocket != null && !serverSocket.isClosed()) {
      try { serverSocket.close(); } catch (Exception e) { }
      if (debug) System.out.println("Server close");
    }
  }

  void startClient(int n) {
    Thread clientThread = new Thread(() -> {
      if (debug) System.out.println("Start client " + Thread.currentThread().getName());
      Socket socket = null;
      OutputStream cos, dos, os = null;
      InputStream cis, iis, is = null;
      try {
        String username = "username", password = "password";
        KeyGenerator keyGenerator = KeyGenerator.getInstance(secretKeyAlgorithm.split("/")[0]);
        keyGenerator.init(128);
        SecretKey secretKey = keyGenerator.generateKey();
        Cipher encryptCipher = Cipher.getInstance(secretKeyAlgorithm);
        encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
        Cipher decryptCipher = Cipher.getInstance(secretKeyAlgorithm);
        decryptCipher.init(Cipher.DECRYPT_MODE, secretKey);
        socket = new Socket(host, port);
        socket.setSoTimeout(10_000);
        os = socket.getOutputStream();
        is = socket.getInputStream();
        RenwOutputStream ros = new RenwOutputStream(os);
        RenwInputStream ris = new RenwInputStream(is);
        byte[] publicKeyBytes = readAllBytes(ris);
        ris.renew();
        Key publicKey = KeyFactory.getInstance(asymmetricKeyAlgorithm.split("/")[0]).generatePublic(new X509EncodedKeySpec(publicKeyBytes));
        publicKeyBytes = null;
        Cipher cipher = Cipher.getInstance(asymmetricKeyAlgorithm);
        cipher.init(Cipher.ENCRYPT_MODE, publicKey);
        ros.write(cipher.doFinal(secretKey.getEncoded()));
        ros.close();
        cipher = null;
        ros.write(encryptCipher.doFinal(username.getBytes(StandardCharsets.UTF_8)));
        ros.close();
        ros.write(encryptCipher.doFinal(MessageDigest.getInstance(hashAlgorithm).digest(password.getBytes(StandardCharsets.UTF_8))));
        ros.close();
        boolean compressing = true;
        ros.write(encryptCipher.doFinal(new byte[] {(byte)(compressing ? 1 : 0)}));
        ros.close();
        byte[] statusBytes = decryptCipher.doFinal(readAllBytes(ris));
        ris.renew();
        boolean okPass = (statusBytes[0] != 0);
        compressing = (statusBytes[1] != 0);
        if (!okPass) throw new IOException("password is incorrect");
        System.out.println("compressing: " + (compressing ? "On": "Off"));
        byte[] bt1 = new byte[65_000];
        byte[] bt2 = new byte[bt1.length];
        socket.setReceiveBufferSize((int)(bt1.length * 1.2));  // SocketOptions.SO_RCVBUF
        socket.setSendBufferSize((int)(bt1.length * 1.2));     // SocketOptions.SO_SNDBUF
        for (int i = 0; i < bt1.length; i++)
          //bt1[i] = 0;
          //bt1[i] = (byte)(i & 0xFF);
          bt1[i] = (byte)(Math.random() * 256);
        Deflater deflater = null;
        Inflater inflater = null;
        if (compressing) {
          deflater = new Deflater();
          inflater = new Inflater();
        }
        long now = System.currentTimeMillis();
        byte[] buffer = new byte[1024];
        int i = 0;
        for (; i < 160; i++) {
          cos = new CipherOutputStream(ros, encryptCipher);
          cis = new CipherInputStream(ris, decryptCipher);
          if (compressing) {
            deflater.reset();
            inflater.reset();
            dos = new DeflaterOutputStream(cos, deflater);
            iis = new InflaterInputStream(cis, inflater);
          } else {
            dos = cos;
            iis = cis;
          }
          dos.write(bt1);
          dos.close();
          int len, destPos = 0;
          while ((len = iis.read(buffer)) >= 0) {
            System.arraycopy(buffer, 0, bt2, destPos, len);
            destPos += len;
          }
          System.out.print("*");
          if (!Arrays.equals(bt1, bt2)) System.out.println("ERROR");
          ris.renew();
          iis.close();
        }
        //System.out.println("Elapsed: " + (System.currentTimeMillis() - now) + " ms");
        System.out.printf("Speed: %.2f MB/s%n", 2 * i * bt1.length / (float)((System.currentTimeMillis() - now) * 1_000));
      } catch (Exception e) {
        if (debug) System.out.println("Client " + e);
      }
      if (os != null) try { os.close(); } catch (Exception e) { }
      if (is != null) try { is.close(); } catch (Exception e) { }
      if (socket != null && !socket.isClosed()) try { socket.close(); } catch (Exception e) { }
      if (debug) System.out.println("Client " + Thread.currentThread().getName() + " close");
    });
    clientThread.setName("" + n);
    //clientThread.setDaemon(true);
    clientThread.start();
  }

  public byte[] readAllBytes(InputStream in) throws IOException {  //in.readAllBytes()
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    transferTo(in, baos);
    return baos.toByteArray();
  }

  public long transferTo(InputStream in, OutputStream out) throws IOException {  // in.transferTo(out)
    long transferred = 0;
    byte[] buffer = new byte[32];
    int read;
    while ((read = in.read(buffer)) >= 0) {
      out.write(buffer, 0, read);
      transferred += read;
    }
    out.flush();
    return transferred;
  }

  public class RenwInputStream extends InputStream {

    private InputStream in;
    private byte[] buffer = new byte[127];
    private int pos = 0, length = 0;
    private boolean more = true;

    public RenwInputStream(InputStream in) {
      this.in = in;
    }

    @Override
    public int read() throws IOException {
      if (pos < 0 || (pos >= length && end())) return -1;
      return buffer[pos++] & 0xFF;
    }

    @Override
    public int read(byte bt[], int off, int len) throws IOException {
      if ((off | len | (off + len) | (bt.length - (off + len))) < 0) throw new IndexOutOfBoundsException();
      if (pos < 0) return -1;
      if (len == 0) return 0;
      if (pos >= length && end()) return -1;
      int i = 0;
      for (int count, li; i < len ;) {
        if (pos >= length && end()) break;
        count = length - pos;
        li = len - i;
        if (count > li) count = li;
        System.arraycopy(buffer, pos, bt, off + i, count);
        pos += count;
        i += count;
      }
      return i;
    }

    private boolean end() throws IOException {
      pos = length = 0;
      if (more) {
        int c = in.read();
        if (c == -1) throw new IOException("EOS");
        more = (c & 0x80) != 0;  //0x80 = 1000 0000
        int ln = c & 0x7F;       //0x7F = 0111 1111
        if (ln > 0) {
          int l;
          while ((l = in.read(buffer, length, ln - length)) != -1) {
            length += l;
            if (length >= ln) break;
            //try { Thread.sleep(10); } catch (Exception e) { Thread.currentThread().interrupt(); }
          }
        }
      }
      if (length == 0) {
        more = true;
        pos = -1;
        return true;
      }
      return false;
    }

    public void renew() {
      pos = 0;
    }

  } // End class RenwInputStream

  public class RenwOutputStream extends OutputStream {

    private OutputStream out;
    private byte buffer[];
    private int pos = 0;

    public RenwOutputStream(OutputStream out, int size) {
      if (size < 1 || size > 127) throw new IllegalArgumentException("buffer size is " + size + ", should be 1-127");
      this.out = out;
      this.buffer = new byte[size];
    }

    public RenwOutputStream(OutputStream out) {
      this(out, 127);
    }

    @Override
    public void write(int b) throws IOException {
      if (pos >= buffer.length) flushBuffer(true);
      buffer[pos++] = (byte)b;
    }

    @Override
    public void write(byte bt[], int off, int len) throws IOException {
      if ((off | len | (off + len) | (bt.length - (off + len))) < 0) throw new IndexOutOfBoundsException();
      for (int count, li, i = 0; i < len;) {
        if (pos >= buffer.length) flushBuffer(true);
        count = buffer.length - pos;
        li = len - i;
        if (count > li) count = li;
        System.arraycopy(bt, off + i, buffer, pos, count);
        pos += count;
        i += count;
      }
    }

    private void flushBuffer(boolean more) throws IOException {
      out.write(more ? pos | 0x80 : pos);  //0x80 = 1000 0000
      if (pos > 0) out.write(buffer, 0, pos);
      pos = 0;
    }

    @Override
    public void close() throws IOException {
      flushBuffer(false);
      out.flush();
    }

  } // End class RenwOutputStream
}

Download ZIP

Back