Coded data transfer in java
--------- RenewableStreamExample.java ----------
import java.util.*;
import java.util.concurrent.*;
import java.util.zip.*;
import java.io.*;
import java.net.*;
import java.security.*;
import java.security.spec.*;
import javax.crypto.*;
import javax.crypto.spec.*;
import java.nio.charset.*;
public class RenewableStreamExample {
boolean debug = true;
String host = "127.0.0.1";
int port = 502;
int serverMaxConnections = 3;
String asymmetricKeyAlgorithm = "RSA/ECB/PKCS1Padding";
String secretKeyAlgorithm = "AES/ECB/PKCS5Padding";
String hashAlgorithm = "SHA-1";
volatile Thread serverThread;
volatile ExecutorService executorService;
volatile Map<Runnable, Socket> map;
volatile ServerSocket serverSocket;
public static void main(String[] args) throws Exception { new RenewableStreamExample(); }
RenewableStreamExample() throws Exception {
startServer();
for (int i = 0; i < 1; i++) startClient(i);
Thread.sleep(5_000);
while (map.size() > 0) Thread.sleep(1_000);
stopServer();
}
void startServer() {
serverThread = new Thread(() -> {
try {
if (debug) System.out.println("Start server");
KeyPairGenerator kpg = KeyPairGenerator.getInstance(asymmetricKeyAlgorithm.split("/")[0]);
kpg.initialize(512);
KeyPair keyPair = kpg.genKeyPair();
Key publicKey = keyPair.getPublic();
Key privateKey = keyPair.getPrivate();
map = Collections.synchronizedMap(new HashMap<>());
executorService = Executors.newCachedThreadPool((runnable) -> {
Thread thread = new Thread(runnable);
//thread.setDaemon(true);
return thread;
});
serverSocket = new ServerSocket(port);
try {
while (!Thread.currentThread().isInterrupted()) {
Socket socket = serverSocket.accept();
executorService.submit(() -> {
synchronized (serverThread) {
if (map.size() >= serverMaxConnections) {
try { socket.close(); } catch (Exception e) { }
if (debug) System.out.println("Server connection " + map.size() + " reject: " + socket.toString());
return;
}
map.put(Thread.currentThread(), socket);
}
if (debug) System.out.println("Server connection " + map.size() + " accept: " + socket.toString());
OutputStream cos, dos, os = null;
InputStream cis, iis, is = null;
try {
socket.setSoTimeout(60_000);
is = socket.getInputStream();
os = socket.getOutputStream();
RenwInputStream ris = new RenwInputStream(is);
RenwOutputStream ros = new RenwOutputStream(os);
ros.write(publicKey.getEncoded());
ros.close();
Cipher cipher = Cipher.getInstance(asymmetricKeyAlgorithm);
cipher.init(Cipher.DECRYPT_MODE, privateKey);
byte[] secretKeyBytes = cipher.doFinal(readAllBytes(ris));
ris.renew();
cipher = null;
SecretKey secretKey = new SecretKeySpec(secretKeyBytes, secretKeyAlgorithm.split("/")[0]);
secretKeyBytes = null;
Cipher encryptCipher = Cipher.getInstance(secretKeyAlgorithm);
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
Cipher decryptCipher = Cipher.getInstance(secretKeyAlgorithm);
decryptCipher.init(Cipher.DECRYPT_MODE, secretKey);
String username = new String(decryptCipher.doFinal(readAllBytes(ris)), StandardCharsets.UTF_8);
//System.out.println(username);
ris.renew();
boolean okPass = Arrays.equals(decryptCipher.doFinal(readAllBytes(ris)), MessageDigest.getInstance(hashAlgorithm).digest("password".getBytes(StandardCharsets.UTF_8)));
ris.renew();
boolean compressing = (decryptCipher.doFinal(readAllBytes(ris))[0] != 0);
ris.renew();
ros.write(encryptCipher.doFinal(new byte[] {(byte)(okPass ? 1 : 0), (byte)(compressing ? 1 : 0)}));
ros.close();
if (!okPass) throw new IOException("password is incorrect");
Deflater deflater = null;
Inflater inflater = null;
if (compressing) {
deflater = new Deflater();
inflater = new Inflater();
}
byte[] buffer = new byte[1024];
int len;
while (!Thread.currentThread().isInterrupted()) {
cis = new CipherInputStream(ris, decryptCipher);
cos = new CipherOutputStream(ros, encryptCipher);
if (compressing) {
inflater.reset();
deflater.reset();
iis = new InflaterInputStream(cis, inflater);
dos = new DeflaterOutputStream(cos, deflater);
} else {
iis = cis;
dos = cos;
}
while ((len = iis.read(buffer)) >= 0) {
dos.write(buffer, 0, len);
}
ris.renew();
iis.close();
dos.close();
}
} catch (Exception e) {
if (debug) System.out.println("Server connection " + e);
}
if (is != null) try { is.close(); } catch (Exception e) { }
if (os != null) try { os.close(); } catch (Exception e) { }
if (socket != null && !socket.isClosed()) try { socket.close(); } catch (Exception e) { }
map.remove(Thread.currentThread());
if (debug) System.out.println("Server connection close: " + socket.toString());
});
}
} catch (Exception e) {
if (debug) System.out.println("Server " + e);
}
} catch (Exception e) {
e.printStackTrace();
}
});
//serverThread.setDaemon(true);
serverThread.start();
}
public void stopServer() {
if (debug) System.out.println("Stop server");
int timeout = 10_000;
serverThread.interrupt();
executorService.shutdownNow();
for (int c = 0, step = 500; c < timeout && map.size() > 0; c += step) {
try { Thread.sleep(step); } catch (Exception e) { }
}
map.forEach((thread, socket) -> {
try { socket.getInputStream().close(); } catch (Exception e) { }
try { socket.getOutputStream().close(); } catch (Exception e) { }
try { socket.close(); } catch (Exception e) { }
});
if (serverSocket != null && !serverSocket.isClosed()) {
try { serverSocket.close(); } catch (Exception e) { }
if (debug) System.out.println("Server close");
}
}
void startClient(int n) {
Thread clientThread = new Thread(() -> {
if (debug) System.out.println("Start client " + Thread.currentThread().getName());
Socket socket = null;
OutputStream cos, dos, os = null;
InputStream cis, iis, is = null;
try {
String username = "username", password = "password";
KeyGenerator keyGenerator = KeyGenerator.getInstance(secretKeyAlgorithm.split("/")[0]);
keyGenerator.init(128);
SecretKey secretKey = keyGenerator.generateKey();
Cipher encryptCipher = Cipher.getInstance(secretKeyAlgorithm);
encryptCipher.init(Cipher.ENCRYPT_MODE, secretKey);
Cipher decryptCipher = Cipher.getInstance(secretKeyAlgorithm);
decryptCipher.init(Cipher.DECRYPT_MODE, secretKey);
socket = new Socket(host, port);
socket.setSoTimeout(10_000);
os = socket.getOutputStream();
is = socket.getInputStream();
RenwOutputStream ros = new RenwOutputStream(os);
RenwInputStream ris = new RenwInputStream(is);
byte[] publicKeyBytes = readAllBytes(ris);
ris.renew();
Key publicKey = KeyFactory.getInstance(asymmetricKeyAlgorithm.split("/")[0]).generatePublic(new X509EncodedKeySpec(publicKeyBytes));
publicKeyBytes = null;
Cipher cipher = Cipher.getInstance(asymmetricKeyAlgorithm);
cipher.init(Cipher.ENCRYPT_MODE, publicKey);
ros.write(cipher.doFinal(secretKey.getEncoded()));
ros.close();
cipher = null;
ros.write(encryptCipher.doFinal(username.getBytes(StandardCharsets.UTF_8)));
ros.close();
ros.write(encryptCipher.doFinal(MessageDigest.getInstance(hashAlgorithm).digest(password.getBytes(StandardCharsets.UTF_8))));
ros.close();
boolean compressing = true;
ros.write(encryptCipher.doFinal(new byte[] {(byte)(compressing ? 1 : 0)}));
ros.close();
byte[] statusBytes = decryptCipher.doFinal(readAllBytes(ris));
ris.renew();
boolean okPass = (statusBytes[0] != 0);
compressing = (statusBytes[1] != 0);
if (!okPass) throw new IOException("password is incorrect");
System.out.println("compressing: " + (compressing ? "On": "Off"));
byte[] bt1 = new byte[65_000];
byte[] bt2 = new byte[bt1.length];
socket.setReceiveBufferSize((int)(bt1.length * 1.2)); // SocketOptions.SO_RCVBUF
socket.setSendBufferSize((int)(bt1.length * 1.2)); // SocketOptions.SO_SNDBUF
for (int i = 0; i < bt1.length; i++)
//bt1[i] = 0;
//bt1[i] = (byte)(i & 0xFF);
bt1[i] = (byte)(Math.random() * 256);
Deflater deflater = null;
Inflater inflater = null;
if (compressing) {
deflater = new Deflater();
inflater = new Inflater();
}
long now = System.currentTimeMillis();
byte[] buffer = new byte[1024];
int i = 0;
for (; i < 160; i++) {
cos = new CipherOutputStream(ros, encryptCipher);
cis = new CipherInputStream(ris, decryptCipher);
if (compressing) {
deflater.reset();
inflater.reset();
dos = new DeflaterOutputStream(cos, deflater);
iis = new InflaterInputStream(cis, inflater);
} else {
dos = cos;
iis = cis;
}
dos.write(bt1);
dos.close();
int len, destPos = 0;
while ((len = iis.read(buffer)) >= 0) {
System.arraycopy(buffer, 0, bt2, destPos, len);
destPos += len;
}
System.out.print("*");
if (!Arrays.equals(bt1, bt2)) System.out.println("ERROR");
ris.renew();
iis.close();
}
//System.out.println("Elapsed: " + (System.currentTimeMillis() - now) + " ms");
System.out.printf("Speed: %.2f MB/s%n", 2 * i * bt1.length / (float)((System.currentTimeMillis() - now) * 1_000));
} catch (Exception e) {
if (debug) System.out.println("Client " + e);
}
if (os != null) try { os.close(); } catch (Exception e) { }
if (is != null) try { is.close(); } catch (Exception e) { }
if (socket != null && !socket.isClosed()) try { socket.close(); } catch (Exception e) { }
if (debug) System.out.println("Client " + Thread.currentThread().getName() + " close");
});
clientThread.setName("" + n);
//clientThread.setDaemon(true);
clientThread.start();
}
public byte[] readAllBytes(InputStream in) throws IOException { //in.readAllBytes()
ByteArrayOutputStream baos = new ByteArrayOutputStream();
transferTo(in, baos);
return baos.toByteArray();
}
public long transferTo(InputStream in, OutputStream out) throws IOException { // in.transferTo(out)
long transferred = 0;
byte[] buffer = new byte[32];
int read;
while ((read = in.read(buffer)) >= 0) {
out.write(buffer, 0, read);
transferred += read;
}
out.flush();
return transferred;
}
public class RenwInputStream extends InputStream {
private InputStream in;
private byte[] buffer = new byte[127];
private int pos = 0, length = 0;
private boolean more = true;
public RenwInputStream(InputStream in) {
this.in = in;
}
@Override
public int read() throws IOException {
if (pos < 0 || (pos >= length && end())) return -1;
return buffer[pos++] & 0xFF;
}
@Override
public int read(byte bt[], int off, int len) throws IOException {
if ((off | len | (off + len) | (bt.length - (off + len))) < 0) throw new IndexOutOfBoundsException();
if (pos < 0) return -1;
if (len == 0) return 0;
if (pos >= length && end()) return -1;
int i = 0;
for (int count, li; i < len ;) {
if (pos >= length && end()) break;
count = length - pos;
li = len - i;
if (count > li) count = li;
System.arraycopy(buffer, pos, bt, off + i, count);
pos += count;
i += count;
}
return i;
}
private boolean end() throws IOException {
pos = length = 0;
if (more) {
int c = in.read();
if (c == -1) throw new IOException("EOS");
more = (c & 0x80) != 0; //0x80 = 1000 0000
int ln = c & 0x7F; //0x7F = 0111 1111
if (ln > 0) {
int l;
while ((l = in.read(buffer, length, ln - length)) != -1) {
length += l;
if (length >= ln) break;
//try { Thread.sleep(10); } catch (Exception e) { Thread.currentThread().interrupt(); }
}
}
}
if (length == 0) {
more = true;
pos = -1;
return true;
}
return false;
}
public void renew() {
pos = 0;
}
} // End class RenwInputStream
public class RenwOutputStream extends OutputStream {
private OutputStream out;
private byte buffer[];
private int pos = 0;
public RenwOutputStream(OutputStream out, int size) {
if (size < 1 || size > 127) throw new IllegalArgumentException("buffer size is " + size + ", should be 1-127");
this.out = out;
this.buffer = new byte[size];
}
public RenwOutputStream(OutputStream out) {
this(out, 127);
}
@Override
public void write(int b) throws IOException {
if (pos >= buffer.length) flushBuffer(true);
buffer[pos++] = (byte)b;
}
@Override
public void write(byte bt[], int off, int len) throws IOException {
if ((off | len | (off + len) | (bt.length - (off + len))) < 0) throw new IndexOutOfBoundsException();
for (int count, li, i = 0; i < len;) {
if (pos >= buffer.length) flushBuffer(true);
count = buffer.length - pos;
li = len - i;
if (count > li) count = li;
System.arraycopy(bt, off + i, buffer, pos, count);
pos += count;
i += count;
}
}
private void flushBuffer(boolean more) throws IOException {
out.write(more ? pos | 0x80 : pos); //0x80 = 1000 0000
if (pos > 0) out.write(buffer, 0, pos);
pos = 0;
}
@Override
public void close() throws IOException {
flushBuffer(false);
out.flush();
}
} // End class RenwOutputStream
}