is used to search the Distributed Hash Table (DHT) in a Peer-to-Peer Network, where a peer only remembers the addresses of a few companions but can get access to any peer who is responsible for a given key. This demo program is largely based on such thinking although there may be some modifications and simplifications.
Moreover, in this program I used Java UDP sockets to simulate a simple reliable data transfer protocol, which can deal with bit error, packet error and packet loss. A message is encapsulated with a SEQ number, an ACK number and a CRC checksum. To simplify the application layer programming, I provide a Client class and a Server class with some convenient APIs. A Server should always listen and answer alternatively, and can be informed or inquired by calling a Client method.
Makefile:
all: ./bin/Chord.class ./bin/Peer.class./bin/Chord.class: ./src/Chord.java ./src/Peer.java javac -classpath ./bin/ -d ./bin/ ./src/Chord.java./bin/Peer.class: ./src/Peer.java javac -d ./bin/ ./src/Peer.javaclean: rm ./bin/*.class
1. Chord.java
1 import java.util.concurrent.*; 2 import java.util.*; 3 import java.io.*; 4 5 6 public class Chord extends Thread { 7 private static int num; 8 private static Process[] p; 9 private static Chord[] t;10 private static int[] port;11 private BufferedReader in;12 private PrintWriter out;13 14 public Chord(Process proc) {15 in = new BufferedReader(new InputStreamReader(proc.getInputStream()));16 out = new PrintWriter(new OutputStreamWriter(proc.getOutputStream()));17 start();18 }19 public void run() {20 try {21 out.println(num);22 for (int i=0;i
2. Peer.java
1 import java.util.concurrent.atomic.*; 2 import java.util.concurrent.*; 3 import java.util.*; 4 import java.text.*; 5 import java.net.*; 6 import java.io.*; 7 8 9 abstract class RDT { 10 /** RDT is a class using UDP sockets to simulate 11 * reliable data transfer that implements packet 12 * loss/error detection and retransmission. 13 * An RDT message = SEQ + ACK + content + CRC checksum 14 * if (SEQ==0) 15 * no reply is expected, a client "ACK" message 16 * else 17 * a reply with the identical ACK is expected 18 */ 19 20 private static final int GEN = 0x04C11DB7; 21 private static final int SIZE = 256; 22 protected static final int TIME_LIMIT = 10; 23 protected static Random rand; 24 protected InetAddress addr; 25 protected int targetPort; 26 protected DatagramSocket socket; 27 private DatagramPacket packet; 28 private byte[] buffer; 29 private byte ack = 0; 30 31 static { 32 rand = new Random(); 33 } 34 protected String rcvd() { 35 /* Receive a message with desired ACK # */ 36 try { 37 do { 38 buffer = new byte[SIZE]; 39 packet = new DatagramPacket(buffer,SIZE); 40 socket.receive(packet); 41 } while (checksum()!=0||buffer[1]!=ack); 42 ack = buffer[0]; // next ACK # to be written 43 if (ack!=0) { 44 addr = packet.getAddress(); 45 targetPort = packet.getPort(); 46 } 47 return new String(buffer,"ASCII").substring(2,SIZE-4); 48 } catch (Exception e) { 49 // Timeout Exception: the packet is lost! 50 return null; 51 } 52 } 53 protected void send(byte seq,String str) { 54 /* Send a message with 'best effort' */ 55 buffer = new byte[SIZE]; 56 buffer[0] = seq; 57 buffer[1] = ack; 58 try { // write the data content 59 byte[] tmp = str.getBytes("ASCII"); 60 System.arraycopy(tmp,0,buffer,2,tmp.length); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 // write the checksum field 65 int cks = checksum(); 66 buffer[SIZE-4] = (byte)(cks&0xFF); 67 cks >>= 8; 68 buffer[SIZE-3] = (byte)(cks&0xFF); 69 cks >>= 8; 70 buffer[SIZE-2] = (byte)(cks&0xFF); 71 cks >>= 8; 72 buffer[SIZE-1] = (byte)(cks&0xFF); 73 // encapsulate and send the packet 74 packet = new DatagramPacket(buffer,SIZE,addr,targetPort); 75 try { 76 socket.send(packet); 77 } catch (Exception e) { 78 System.out.println("Cannot send msg: "+str); 79 } 80 // next ACK # to be read 81 ack = buffer[0]; 82 } 83 private int checksum() { 84 /* Calculate the CRC checksum */ 85 int rem = 0, N = (SIZE<<3); 86 for (int pos=0;pos>3]>>(pos&7)); 88 if (rem>=0) { 89 rem = (rem<<1)|(tmp&1); 90 } else { 91 rem = (rem<<1)|(tmp&1); 92 rem ^= GEN; 93 } 94 } 95 return revBits(rem); 96 } 97 private int revBits(int val) { 98 /* Reverse the bits of an integer */ 99 int tmp = 0;100 for (int i=0;i<16;i++) {101 tmp |= (((1< <<(31-i-i));102 tmp |= (((1<<(31-i))&val)>>>(31-i-i));103 }104 return tmp;105 }106 protected static boolean isACK(String str) throws Exception {107 /* Test whether a string is equal to "ACK" */108 if (str==null) {109 return false;110 } else {111 byte[] b = str.getBytes("ASCII");112 return (b.length>=3 && b[0]=='A'113 && b[1]=='C' && b[2]=='K');114 }115 }116 }117 118 119 class Client extends RDT {120 /** An RDT client should always send a message first,121 * then receives a reply, and finally send an "ACK"122 * message to terminate the conversation.123 * An "ACK" message from the client indicates that124 * it has received the reply and left.125 */126 127 private Client(int port) {128 try {129 addr = InetAddress.getByName("localhost");130 targetPort = port;131 socket = new DatagramSocket();132 socket.setSoTimeout(TIME_LIMIT);133 } catch(Exception e) {134 e.printStackTrace();135 }136 }137 public static String inquire(int port,String str) {138 /* Send a inquiry and return a pending reply */139 Client c = new Client(port);140 String res = null;141 while (res==null) {142 c.send((byte)(1+rand.nextInt(255)),str);143 res = c.rcvd();144 }145 c.send((byte)0,"ACK");146 c.socket.close();147 return res;148 }149 public static void inform(int port,String str) {150 /* Send a piece of info and get an "ACK" */151 Client c = new Client(port);152 String res = null;153 try {154 while (!isACK(res)) {155 c.send((byte)(1+rand.nextInt(255)),str);156 res = c.rcvd();157 }158 } catch (Exception e) {159 e.printStackTrace();160 }161 c.send((byte)0,"ACK");162 c.socket.close();163 }164 }165 166 167 class Server extends RDT {168 /** An RDT server should always listen to a request 169 * first, then respond with a reply, and finally170 * get an "ACK" message.171 * An "ACK" message from the server indicates that172 * it has been informed of something.173 */174 175 public Server(int port) {176 try {177 socket = new DatagramSocket(port);178 socket.setSoTimeout(TIME_LIMIT);179 } catch (Exception e) {180 e.printStackTrace();181 }182 }183 public String listen() {184 String req = null;185 while (req==null) {186 req = rcvd();187 }188 return req;189 }190 public void answer(String str) {191 String res = null;192 try {193 while (!isACK(res)) {194 send((byte)(1+rand.nextInt(255)),str);195 res = rcvd();196 }197 } catch (Exception e) {198 e.printStackTrace();199 }200 }201 }202 203 204 public class Peer extends Thread {205 /** A node on a Peer-to-Peer Network206 * For consecutive peers with indexes i and j:207 * peer[i] is resp for key k iff i <= k < j;208 * finger[k] is the port # of the on-line peer with209 * the minimum index not less than index+2^{k}.210 */211 212 private static final int LEN = 16;213 private static final int NUM = (1<
References:
1. Kurose, James F., Keith W. Ross. Computer Networking: a top-down approach[M]. 北京:高等教育出版社, 2009-08
2. Tanenbaum, Andrew S., David J. Wetherall. Computer Networks 5th edition[M]. 北京:清华大学出版社, 2011