diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ca75479 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.o +*.d +lab1_tester diff --git a/GNUmakefile b/GNUmakefile new file mode 100644 index 0000000..6c225ec --- /dev/null +++ b/GNUmakefile @@ -0,0 +1,129 @@ +LAB=1 +SOL=0 +RPC=./rpc +LAB1GE=$(shell expr $(LAB) \>\= 1) +LAB2GE=$(shell expr $(LAB) \>\= 2) +LAB3GE=$(shell expr $(LAB) \>\= 3) +LAB4GE=$(shell expr $(LAB) \>\= 4) +LAB5GE=$(shell expr $(LAB) \>\= 5) +LAB6GE=$(shell expr $(LAB) \>\= 6) +LAB7GE=$(shell expr $(LAB) \>\= 7) +CXXFLAGS = -g -MMD -Wall -I. -I$(RPC) -DLAB=$(LAB) -DSOL=$(SOL) -D_FILE_OFFSET_BITS=64 + +ifeq ($(shell uname -s),Darwin) + MACFLAGS= -D__FreeBSD__=10 +else + MACFLAGS= +endif +LDFLAGS = -L. -L/usr/local/lib +LDLIBS = -lpthread + +LDLIBS += $(shell test -f `gcc -print-file-name=librt.so` && echo -lrt) +LDLIBS += $(shell test -f `gcc -print-file-name=libdl.so` && echo -ldl) +CC = g++ +CXX = g++ + +lab: lab$(LAB) +lab1: lab1_tester +lab2: yfs_client +lab3: yfs_client extent_server lock_server test-lab-3-b test-lab-3-c +lab4: yfs_client extent_server lock_server lock_tester test-lab-3-b\ + test-lab-3-c +lab5: yfs_client extent_server lock_server test-lab-3-b test-lab-3-c +lab6: lock_server rsm_tester +lab7: lock_tester lock_server rsm_tester + +hfiles1=rpc/fifo.h rpc/connection.h rpc/rpc.h rpc/marshall.h rpc/method_thread.h\ + rpc/thr_pool.h rpc/pollmgr.h rpc/jsl_log.h rpc/slock.h rpc/rpctest.cc\ + lock_protocol.h lock_server.h lock_client.h gettime.h gettime.cc lang/verify.h \ + lang/algorithm.h +hfiles2=yfs_client.h extent_client.h extent_protocol.h extent_server.h +hfiles3=lock_client_cache.h lock_server_cache.h handle.h tprintf.h +hfiles4=log.h rsm.h rsm_protocol.h config.h paxos.h paxos_protocol.h rsm_state_transfer.h rsmtest_client.h tprintf.h +hfiles5=rsm_state_transfer.h rsm_client.h +rsm_files = rsm.cc paxos.cc config.cc log.cc handle.cc + +rpclib=rpc/rpc.cc rpc/connection.cc rpc/pollmgr.cc rpc/thr_pool.cc rpc/jsl_log.cc gettime.cc +rpc/librpc.a: $(patsubst %.cc,%.o,$(rpclib)) + rm -f $@ + ar cq $@ $^ + ranlib rpc/librpc.a + +rpc/rpctest=rpc/rpctest.cc +rpc/rpctest: $(patsubst %.cc,%.o,$(rpctest)) rpc/librpc.a + +lock_demo=lock_demo.cc lock_client.cc +lock_demo : $(patsubst %.cc,%.o,$(lock_demo)) rpc/librpc.a + +lock_tester=lock_tester.cc lock_client.cc +ifeq ($(LAB4GE),1) + lock_tester += lock_client_cache.cc +endif +ifeq ($(LAB7GE),1) + lock_tester+=rsm_client.cc handle.cc lock_client_cache_rsm.cc +endif +lock_tester : $(patsubst %.cc,%.o,$(lock_tester)) rpc/librpc.a + +lock_server=lock_server.cc lock_smain.cc +ifeq ($(LAB4GE),1) + lock_server+=lock_server_cache.cc handle.cc +endif +ifeq ($(LAB6GE),1) + lock_server+= $(rsm_files) +endif +ifeq ($(LAB7GE),1) + lock_server+= lock_server_cache_rsm.cc +endif + +lock_server : $(patsubst %.cc,%.o,$(lock_server)) rpc/librpc.a + +lab1_tester=lab1_tester.cc extent_client.cc extent_server.cc inode_manager.cc +lab1_tester : $(patsubst %.cc,%.o,$(lab1_tester)) +yfs_client=yfs_client.cc extent_client.cc fuse.cc extent_server.cc inode_manager.cc +ifeq ($(LAB3GE),1) + yfs_client += lock_client.cc +endif +ifeq ($(LAB7GE),1) + yfs_client += rsm_client.cc lock_client_cache_rsm.cc +endif +ifeq ($(LAB4GE),1) + yfs_client += lock_client_cache.cc +endif +yfs_client : $(patsubst %.cc,%.o,$(yfs_client)) rpc/librpc.a + +extent_server=extent_server.cc extent_smain.cc +extent_server : $(patsubst %.cc,%.o,$(extent_server)) rpc/librpc.a + +test-lab-3-b=test-lab-3-b.c +test-lab-3-b: $(patsubst %.c,%.o,$(test_lab_4-b)) rpc/librpc.a + +test-lab-3-c=test-lab-3-c.c +test-lab-4-c: $(patsubst %.c,%.o,$(test_lab_4-c)) rpc/librpc.a + +rsm_tester=rsm_tester.cc rsmtest_client.cc +rsm_tester: $(patsubst %.cc,%.o,$(rsm_tester)) rpc/librpc.a + +%.o: %.cc + $(CXX) $(CXXFLAGS) -c $< -o $@ + +fuse.o: fuse.cc + $(CXX) -c $(CXXFLAGS) $(FUSEFLAGS) $(MACFLAGS) $< + +# mklab.inc is needed by 6.824 staff only. Just ignore it. +-include mklab.inc + +-include *.d +-include rpc/*.d + +clean_files=rpc/rpctest rpc/*.o rpc/*.d rpc/librpc.a *.o *.d yfs_client extent_server lock_server lock_tester lock_demo rpctest test-lab-3-b test-lab-3-c rsm_tester lab1_tester +.PHONY: clean handin +clean: + rm $(clean_files) -rf + +handin_ignore=$(clean_files) core* *log +handin_file=lab$(LAB).tgz +labdir=$(shell basename $(PWD)) +handin: + @bash -c "cd ../; tar -X <(tr ' ' '\n' < <(echo '$(handin_ignore)')) -czvf $(handin_file) $(labdir); mv $(handin_file) $(labdir); cd $(labdir)" + @echo Please modify lab1.tgz to lab1_[your student id].tgz and upload it to ftp://ytliu.cc:public@public.sjtu.edu.cn/upload/ + @echo Thanks! diff --git a/extent_client.cc b/extent_client.cc new file mode 100644 index 0000000..c9effb3 --- /dev/null +++ b/extent_client.cc @@ -0,0 +1,58 @@ +// RPC stubs for clients to talk to extent_server + +#include "extent_client.h" +#include +#include +#include +#include +#include + +extent_client::extent_client() +{ + es = new extent_server(); +} + +extent_protocol::status +extent_client::create(uint32_t type, extent_protocol::extentid_t &id) +{ + extent_protocol::status ret = extent_protocol::OK; + ret = es->create(type, id); + return ret; +} + +extent_protocol::status +extent_client::get(extent_protocol::extentid_t eid, std::string &buf) +{ + extent_protocol::status ret = extent_protocol::OK; + ret = es->get(eid, buf); + return ret; +} + +extent_protocol::status +extent_client::getattr(extent_protocol::extentid_t eid, + extent_protocol::attr &attr) +{ + extent_protocol::status ret = extent_protocol::OK; + ret = es->getattr(eid, attr); + return ret; +} + +extent_protocol::status +extent_client::put(extent_protocol::extentid_t eid, std::string buf) +{ + extent_protocol::status ret = extent_protocol::OK; + int r; + ret = es->put(eid, buf, r); + return ret; +} + +extent_protocol::status +extent_client::remove(extent_protocol::extentid_t eid) +{ + extent_protocol::status ret = extent_protocol::OK; + int r; + ret = es->remove(eid, r); + return ret; +} + + diff --git a/extent_client.h b/extent_client.h new file mode 100644 index 0000000..b31131a --- /dev/null +++ b/extent_client.h @@ -0,0 +1,27 @@ +// extent client interface. + +#ifndef extent_client_h +#define extent_client_h + +#include +#include "extent_protocol.h" +#include "extent_server.h" + +class extent_client { + private: + extent_server *es; + + public: + extent_client(); + + extent_protocol::status create(uint32_t type, extent_protocol::extentid_t &eid); + extent_protocol::status get(extent_protocol::extentid_t eid, + std::string &buf); + extent_protocol::status getattr(extent_protocol::extentid_t eid, + extent_protocol::attr &a); + extent_protocol::status put(extent_protocol::extentid_t eid, std::string buf); + extent_protocol::status remove(extent_protocol::extentid_t eid); +}; + +#endif + diff --git a/extent_protocol.h b/extent_protocol.h new file mode 100644 index 0000000..5ece0c2 --- /dev/null +++ b/extent_protocol.h @@ -0,0 +1,56 @@ +// extent wire protocol + +#ifndef extent_protocol_h +#define extent_protocol_h + +#include "rpc.h" + +class extent_protocol { + public: + typedef int status; + typedef unsigned long long extentid_t; + enum xxstatus { OK, RPCERR, NOENT, IOERR }; + enum rpc_numbers { + put = 0x6001, + get, + getattr, + remove + }; + + enum types { + T_DIR = 1, + T_FILE + }; + + struct attr { + uint32_t type; + unsigned int atime; + unsigned int mtime; + unsigned int ctime; + unsigned int size; + }; +}; + +inline unmarshall & +operator>>(unmarshall &u, extent_protocol::attr &a) +{ + u >> a.type; + u >> a.atime; + u >> a.mtime; + u >> a.ctime; + u >> a.size; + return u; +} + +inline marshall & +operator<<(marshall &m, extent_protocol::attr a) +{ + m << a.type; + m << a.atime; + m << a.mtime; + m << a.ctime; + m << a.size; + return m; +} + +#endif diff --git a/extent_server.cc b/extent_server.cc new file mode 100644 index 0000000..116ec07 --- /dev/null +++ b/extent_server.cc @@ -0,0 +1,80 @@ +// the extent server implementation + +#include "extent_server.h" +#include +#include +#include +#include +#include +#include +#include + +extent_server::extent_server() +{ + im = new inode_manager(); +} + +int extent_server::create(uint32_t type, extent_protocol::extentid_t &id) +{ + // alloc a new inode and return inum + printf("extent_server: create inode\n"); + id = im->alloc_inode(type); + + return extent_protocol::OK; +} + +int extent_server::put(extent_protocol::extentid_t id, std::string buf, int &) +{ + id &= 0x7fffffff; + + const char * cbuf = buf.c_str(); + int size = buf.size(); + im->write_file(id, cbuf, size); + + return extent_protocol::OK; +} + +int extent_server::get(extent_protocol::extentid_t id, std::string &buf) +{ + printf("extent_server: get %lld\n", id); + + id &= 0x7fffffff; + + int size = 0; + char *cbuf = NULL; + + im->read_file(id, &cbuf, &size); + if (size == 0) + buf = ""; + else { + buf.assign(cbuf, size); + free(cbuf); + } + + return extent_protocol::OK; +} + +int extent_server::getattr(extent_protocol::extentid_t id, extent_protocol::attr &a) +{ + printf("extent_server: getattr %lld\n", id); + + id &= 0x7fffffff; + + extent_protocol::attr attr; + memset(&attr, 0, sizeof(attr)); + im->getattr(id, attr); + a = attr; + + return extent_protocol::OK; +} + +int extent_server::remove(extent_protocol::extentid_t id, int &) +{ + printf("extent_server: write %lld\n", id); + + id &= 0x7fffffff; + im->remove_file(id); + + return extent_protocol::OK; +} + diff --git a/extent_server.h b/extent_server.h new file mode 100644 index 0000000..4efece4 --- /dev/null +++ b/extent_server.h @@ -0,0 +1,39 @@ +// this is the extent server + +#ifndef extent_server_h +#define extent_server_h + +#include +#include +#include "extent_protocol.h" +#include "inode_manager.h" + +class extent_server { + protected: +#if 0 + typedef struct extent { + std::string data; + struct extent_protocol::attr attr; + } extent_t; + std::map extents; +#endif + inode_manager *im; + + public: + extent_server(); + + int create(uint32_t type, extent_protocol::extentid_t &id); + int put(extent_protocol::extentid_t id, std::string, int &); + int get(extent_protocol::extentid_t id, std::string &); + int getattr(extent_protocol::extentid_t id, extent_protocol::attr &); + int remove(extent_protocol::extentid_t id, int &); +}; + +#endif + + + + + + + diff --git a/extent_smain.cc b/extent_smain.cc new file mode 100644 index 0000000..bd3024b --- /dev/null +++ b/extent_smain.cc @@ -0,0 +1,36 @@ +#include "rpc.h" +#include +#include +#include +#include "extent_server.h" + +// Main loop of extent server + +int +main(int argc, char *argv[]) +{ + int count = 0; + + if(argc != 2){ + fprintf(stderr, "Usage: %s port\n", argv[0]); + exit(1); + } + + setvbuf(stdout, NULL, _IONBF, 0); + + char *count_env = getenv("RPC_COUNT"); + if(count_env != NULL){ + count = atoi(count_env); + } + + rpcs server(atoi(argv[1]), count); + extent_server ls; + + server.reg(extent_protocol::get, &ls, &extent_server::get); + server.reg(extent_protocol::getattr, &ls, &extent_server::getattr); + server.reg(extent_protocol::put, &ls, &extent_server::put); + server.reg(extent_protocol::remove, &ls, &extent_server::remove); + + while(1) + sleep(1000); +} diff --git a/gettime.cc b/gettime.cc new file mode 100644 index 0000000..cd78779 --- /dev/null +++ b/gettime.cc @@ -0,0 +1,139 @@ +/* + * Copyright (c), MM Weiss + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modification, + * are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * 3. Neither the name of the MM Weiss nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT + * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR + * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * clock_gettime_stub.c + * gcc -Wall -c clock_gettime_stub.c + * posix realtime functions; MacOS user space glue + */ + +/* @comment + * other possible implementation using intel builtin rdtsc + * rdtsc-workaround: http://www.mcs.anl.gov/~kazutomo/rdtsc.html + * + * we could get the ticks by doing this + * + * __asm __volatile("mov %%ebx, %%esi\n\t" + * "cpuid\n\t" + * "xchg %%esi, %%ebx\n\t" + * "rdtsc" + * : "=a" (a), + * "=d" (d) + * ); + + * we could even replace our tricky sched_yield call by assembly code to get a better accurency, + * anyway the following C stub will satisfy 99% of apps using posix clock_gettime call, + * moreover, the setter version (clock_settime) could be easly written using mach primitives: + * http://www.opensource.apple.com/source/xnu/xnu-${VERSION}/osfmk/man/ (clock_[set|get]_time) + * + * hackers don't be crackers, don't you use a flush toilet? + * + * + * @see draft: ./posix-realtime-stub/posix-realtime-stub.c + * + */ + + +#ifdef __APPLE__ + +#pragma weak clock_gettime + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef enum { + CLOCK_REALTIME, + CLOCK_MONOTONIC, + CLOCK_PROCESS_CPUTIME_ID, + CLOCK_THREAD_CPUTIME_ID +} clockid_t; + +static mach_timebase_info_data_t __clock_gettime_inf; + +int clock_gettime(clockid_t clk_id, struct timespec *tp) { + kern_return_t ret; + clock_serv_t clk; + clock_id_t clk_serv_id; + mach_timespec_t tm; + + uint64_t start, end, delta, nano; + + task_basic_info_data_t tinfo; + task_thread_times_info_data_t ttinfo; + mach_msg_type_number_t tflag; + + int retval = -1; + switch (clk_id) { + case CLOCK_REALTIME: + case CLOCK_MONOTONIC: + clk_serv_id = clk_id == CLOCK_REALTIME ? CALENDAR_CLOCK : SYSTEM_CLOCK; + if (KERN_SUCCESS == (ret = host_get_clock_service(mach_host_self(), clk_serv_id, &clk))) { + if (KERN_SUCCESS == (ret = clock_get_time(clk, &tm))) { + tp->tv_sec = tm.tv_sec; + tp->tv_nsec = tm.tv_nsec; + retval = 0; + } + } + if (KERN_SUCCESS != ret) { + errno = EINVAL; + retval = -1; + } + break; + case CLOCK_PROCESS_CPUTIME_ID: + case CLOCK_THREAD_CPUTIME_ID: + start = mach_absolute_time(); + if (clk_id == CLOCK_PROCESS_CPUTIME_ID) { + getpid(); + } else { + sched_yield(); + } + end = mach_absolute_time(); + delta = end - start; + if (0 == __clock_gettime_inf.denom) { + mach_timebase_info(&__clock_gettime_inf); + } + nano = delta * __clock_gettime_inf.numer / __clock_gettime_inf.denom; + tp->tv_sec = nano * 1e-9; + tp->tv_nsec = nano - (tp->tv_sec * 1e9); + retval = 0; + break; + default: + errno = EINVAL; + retval = -1; + } + return retval; +} + +#endif // __APPLE__ diff --git a/gettime.h b/gettime.h new file mode 100644 index 0000000..f10def4 --- /dev/null +++ b/gettime.h @@ -0,0 +1,15 @@ +#ifndef gettime_h +#define gettime_h + +#ifdef __APPLE__ +typedef enum { + CLOCK_REALTIME, + CLOCK_MONOTONIC, + CLOCK_PROCESS_CPUTIME_ID, + CLOCK_THREAD_CPUTIME_ID +} clockid_t; + +int clock_gettime(clockid_t clk_id, struct timespec *tp); +#endif + +#endif diff --git a/inode_manager.cc b/inode_manager.cc new file mode 100644 index 0000000..49809dc --- /dev/null +++ b/inode_manager.cc @@ -0,0 +1,214 @@ +#include "inode_manager.h" + +// disk layer ----------------------------------------- + +disk::disk() +{ + bzero(blocks, sizeof(blocks)); +} + +void +disk::read_block(blockid_t id, char *buf) +{ + if (id < 0 || id > BLOCK_NUM || buf == NULL) + return; + + memcpy(buf, blocks[id], BLOCK_SIZE); +} + +void +disk::write_block(blockid_t id, const char *buf) +{ + if (id < 0 || id > BLOCK_NUM || buf == NULL) + return; + + memcpy(blocks[id], buf, BLOCK_SIZE); +} + +// block layer ----------------------------------------- + +// Allocate a free disk block. +blockid_t +block_manager::alloc_block() +{ + /* + * your lab1 code goes here. + * note: you should mark the corresponding bit in block bitmap when alloc. + * you need to think about which block you can start to be allocated. + */ + + return 0; +} + +void +block_manager::free_block(uint32_t id) +{ + /* + * your lab1 code goes here. + * note: you should unmark the corresponding bit in the block bitmap when free. + */ + + return; +} + +// The layout of disk should be like this: +// |<-sb->|<-inode table->|<-free block bitmap->|<-data->| +block_manager::block_manager() +{ + d = new disk(); + + // format the disk + sb.size = BLOCK_SIZE * BLOCK_NUM; + sb.nblocks = BLOCK_NUM; + sb.ninodes = INODE_NUM; + +} + +void +block_manager::read_block(uint32_t id, char *buf) +{ + d->read_block(id, buf); +} + +void +block_manager::write_block(uint32_t id, const char *buf) +{ + d->write_block(id, buf); +} + +// inode layer ----------------------------------------- + +inode_manager::inode_manager() +{ + bm = new block_manager(); + uint32_t root_dir = alloc_inode(extent_protocol::T_DIR); + if (root_dir != 1) { + printf("\tim: error! alloc first inode %d, should be 1\n", root_dir); + exit(0); + } +} + +/* Create a new file. + * Return its inum. */ +uint32_t +inode_manager::alloc_inode(uint32_t type) +{ + /* + * your lab1 code gose here. + * note: the normal inode block should begin from the 2nd inode block. + * the 1st is used for root_dir, see inode_manager::inode_manager(). + */ + return 1; +} + +void +inode_manager::free_inode(uint32_t inum) +{ + /* + * your lab1 code gose here. + * note: you need to check if the inode is already a freed one; + * if not, clear it, and remember to write back to disk. + */ + + return; +} + + +/* Return an inode structure by inum, NULL otherwise. + * Caller should release the memory. */ +struct inode* +inode_manager::get_inode(uint32_t inum) +{ + struct inode *ino, *ino_disk; + char buf[BLOCK_SIZE]; + + printf("\tim: get_inode %d\n", inum); + + if (inum < 0 || inum >= INODE_NUM) { + printf("\tim: inum out of range\n"); + return NULL; + } + + bm->read_block(IBLOCK(inum), buf); + // printf("%s:%d\n", __FILE__, __LINE__); + + ino_disk = (struct inode*)buf + inum%IPB; + if (ino_disk->type == 0) { + printf("\tim: inode not exist\n"); + return NULL; + } + + ino = (struct inode*)malloc(sizeof(struct inode)); + *ino = *ino_disk; + + return ino; +} + +void +inode_manager::put_inode(uint32_t inum, struct inode *ino) +{ + char buf[BLOCK_SIZE]; + struct inode *ino_disk; + + printf("\tim: put_inode %d\n", inum); + if (ino == NULL) + return; + + bm->read_block(IBLOCK(inum), buf); + ino_disk = (struct inode*)buf + inum%IPB; + *ino_disk = *ino; + bm->write_block(IBLOCK(inum), buf); +} + +#define MIN(a,b) ((a)<(b) ? (a) : (b)) + +/* Get all the data of a file by inum. + * Return alloced data, should be freed by caller. */ +void +inode_manager::read_file(uint32_t inum, char **buf_out, int *size) +{ + /* + * your lab1 code goes here. + * note: read blocks related to inode number inum, + * and copy them to buf_Out + */ + + return; +} + +/* alloc/free blocks if needed */ +void +inode_manager::write_file(uint32_t inum, const char *buf, int size) +{ + /* + * your lab1 code goes here. + * note: write buf to blocks of inode inum. + * you need to consider the situation when the size of buf + * is larger or smaller than the size of original inode + */ + + return; +} + +void +inode_manager::getattr(uint32_t inum, extent_protocol::attr &a) +{ + /* + * your lab1 code gose here. + * note: get the attributes of inode inum. + * you can refer to "struct attr" in extent_protocol.h + */ + + return; +} + +void +inode_manager::remove_file(uint32_t inum) +{ + /* + * your lab1 code gose here + * note: you need to consider about both the data block and inode of the file + */ + + return; +} diff --git a/inode_manager.h b/inode_manager.h new file mode 100644 index 0000000..e4c8103 --- /dev/null +++ b/inode_manager.h @@ -0,0 +1,95 @@ +// inode layer interface. + +#ifndef inode_h +#define inode_h + +#include +#include "extent_protocol.h" // TODO: delete it + +#define DISK_SIZE 1024*1024*16 +#define BLOCK_SIZE 512 +#define BLOCK_NUM (DISK_SIZE/BLOCK_SIZE) + +typedef uint32_t blockid_t; + +// disk layer ----------------------------------------- + +class disk { + private: + unsigned char blocks[BLOCK_NUM][BLOCK_SIZE]; + + public: + disk(); + void read_block(uint32_t id, char *buf); + void write_block(uint32_t id, const char *buf); +}; + +// block layer ----------------------------------------- + +typedef struct superblock { + uint32_t size; + uint32_t nblocks; + uint32_t ninodes; +} superblock_t; + +class block_manager { + private: + disk *d; + std::map using_blocks; + public: + block_manager(); + struct superblock sb; + + uint32_t alloc_block(); + void free_block(uint32_t id); + void read_block(uint32_t id, char *buf); + void write_block(uint32_t id, const char *buf); +}; + +// inode layer ----------------------------------------- + +#define INODE_NUM 1024 + +// Inodes per block. +#define IPB (BLOCK_SIZE / sizeof(struct inode)) + +// Block containing inode i +#define IBLOCK(i) ((i) / IPB + 2) + +// Bitmap bits per block +#define BPB (BLOCK_SIZE*8) + +// Block containing bit for block b +#define BBLOCK(b, ninodes) (b/BPB + (ninodes)/IPB + 3) + +#define NDIRECT 32 +#define NINDIRECT (BSIZE / sizeof(uint)) +#define MAXFILE (NDIRECT + NINDIRECT) + +typedef struct inode { + short type; + unsigned int size; + unsigned int atime; + unsigned int mtime; + unsigned int ctime; + blockid_t blocks[NDIRECT+1]; // Data block addresses +} inode_t; + +class inode_manager { + private: + block_manager *bm; + struct inode* get_inode(uint32_t inum); + void put_inode(uint32_t inum, struct inode *ino); + + public: + inode_manager(); + uint32_t alloc_inode(uint32_t type); + void free_inode(uint32_t inum); + void read_file(uint32_t inum, char **buf, int *size); + void write_file(uint32_t inum, const char *buf, int size); + void remove_file(uint32_t inum); + void getattr(uint32_t inum, extent_protocol::attr &a); +}; + +#endif + diff --git a/lab1_tester.cc b/lab1_tester.cc new file mode 100644 index 0000000..08176dd --- /dev/null +++ b/lab1_tester.cc @@ -0,0 +1,168 @@ +/* lab1 tester. + * Test whether extent_client -> extent_server -> inode_manager behave correctly + */ + +#include "extent_client.h" +#include + +#define FILE_NUM 50 + +#define iprint(msg) \ + printf("[TEST_ERROR]: %s\n", msg); +extent_client *ec; +int total_score = 0; + +int test_create_and_getattr() +{ + int i, rnum; + extent_protocol::extentid_t id; + extent_protocol::attr a; + + printf("========== begin test create and getattr ==========\n"); + + srand((unsigned)time(NULL)); + + for (i = 0; i < FILE_NUM; i++) { + rnum = rand() % 10; + memset(&a, 0, sizeof(a)); + if (rnum < 3) { + ec->create(extent_protocol::T_DIR, id); + if ((int)id == 0) { + iprint("error creating dir\n"); + return 1; + } + if (ec->getattr(id, a) != extent_protocol::OK) { + iprint("error getting attr, return not OK\n"); + return 2; + } + if (a.type != extent_protocol::T_DIR) { + iprint("error getting attr, type is wrong"); + return 3; + } + } else { + ec->create(extent_protocol::T_FILE, id); + if ((int)id == 0) { + iprint("error creating dir\n"); + return 1; + } + if (ec->getattr(id, a) != extent_protocol::OK) { + iprint("error getting attr, return not OK\n"); + return 2; + } + if (a.type != extent_protocol::T_FILE) { + iprint("error getting attr, type is wrong"); + return 3; + } + } + } + total_score += 40; + printf("========== pass test create and getattr ==========\n"); + return 0; +} + + +int test_put_and_get() +{ + int i, rnum; + extent_protocol::extentid_t id; + extent_protocol::attr a; + int contents[FILE_NUM]; + char *temp = (char *)malloc(10); + + printf("========== begin test put and get ==========\n"); + srand((unsigned)time(NULL)); + for (i = 0; i < FILE_NUM; i++) { + memset(&a, 0, sizeof(a)); + id = (extent_protocol::extentid_t)(i+2); + if (ec->getattr(id, a) != extent_protocol::OK) { + iprint("error getting attr, return not OK\n"); + return 1; + } + if (a.type == extent_protocol::T_FILE) { + rnum = rand() % 10000; + memset(temp, 0, 10); + sprintf(temp, "%d", rnum); + std::string buf(temp); + if (ec->put(id, buf) != extent_protocol::OK) { + iprint("error put, return not OK\n"); + return 2; + } + contents[i] = rnum; + } + } + for (i = 0; i < FILE_NUM; i++) { + memset(&a, 0, sizeof(a)); + id = (extent_protocol::extentid_t)(i+2); + if (ec->getattr(id, a) != extent_protocol::OK) { + iprint("error getting attr, return not OK\n"); + return 3; + } + if (a.type == extent_protocol::T_FILE) { + std::string buf; + if (ec->get(id, buf) != extent_protocol::OK) { + iprint("error get, return not OK\n"); + return 4; + } + memset(temp, 0, 10); + sprintf(temp, "%d", contents[i]); + std::string buf2(temp); + if (buf.compare(buf2) != 0) { + std::cout << "[TEST_ERROR] : error get, not consistent with put " << + buf << " <-> " << buf2 << "\n\n"; + return 5; + } + } + } + + + total_score += 40; + printf("========== pass test put and get ==========\n"); + return 0; +} + +int test_remove() +{ + int i; + extent_protocol::extentid_t id; + extent_protocol::attr a; + + printf("========== begin test remove ==========\n"); + for (i = 0; i < FILE_NUM; i++) { + memset(&a, 0, sizeof(a)); + id = (extent_protocol::extentid_t)(i+2); + if (ec->remove(id) != extent_protocol::OK) { + iprint("error removing, return not OK\n"); + return 1; + } + ec->getattr(id, a); + if (a.type != 0) { + iprint("error removing, type is still positive\n"); + return 2; + } + } + total_score += 20; + printf("========== pass test remove ==========\n"); + return 0; +} + +int main(int argc, char *argv[]) +{ + if (argc != 1) { + printf("Usage: ./lab1_tester\n"); + return 1; + } + + ec = new extent_client(); + + if (test_create_and_getattr() != 0) + goto test_finish; + if (test_put_and_get() != 0) + goto test_finish; + if (test_remove() != 0) + goto test_finish; + +test_finish: + printf("---------------------------------\n"); + printf("Final score is : %d\n", total_score); + return 0; +} diff --git a/lang/algorithm.h b/lang/algorithm.h new file mode 100644 index 0000000..a487094 --- /dev/null +++ b/lang/algorithm.h @@ -0,0 +1,18 @@ +// compile time version of min and max + +#ifndef algorithm_h +#define algorithm_h + +template +struct static_max +{ + static const int value = A > B ? A : B; +}; + +template +struct static_min +{ + static const int value = A < B ? A : B; +}; + +#endif diff --git a/lang/verify.h b/lang/verify.h new file mode 100644 index 0000000..2b092d2 --- /dev/null +++ b/lang/verify.h @@ -0,0 +1,15 @@ +// safe assertions. + +#ifndef verify_client_h +#define verify_client_h + +#include +#include + +#ifdef NDEBUG +#define VERIFY(expr) do { if (!(expr)) abort(); } while (0) +#else +#define VERIFY(expr) assert(expr) +#endif + +#endif diff --git a/rpc/connection.cc b/rpc/connection.cc new file mode 100644 index 0000000..94b9194 --- /dev/null +++ b/rpc/connection.cc @@ -0,0 +1,444 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "method_thread.h" +#include "connection.h" +#include "slock.h" +#include "pollmgr.h" +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +#define MAX_PDU (10<<20) //maximum PDF is 10M + + +connection::connection(chanmgr *m1, int f1, int l1) +: mgr_(m1), fd_(f1), dead_(false),waiters_(0), refno_(1),lossy_(l1) +{ + + int flags = fcntl(fd_, F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(fd_, F_SETFL, flags); + + signal(SIGPIPE, SIG_IGN); + VERIFY(pthread_mutex_init(&m_,0)==0); + VERIFY(pthread_mutex_init(&ref_m_,0)==0); + VERIFY(pthread_cond_init(&send_wait_,0)==0); + VERIFY(pthread_cond_init(&send_complete_,0)==0); + + VERIFY(gettimeofday(&create_time_, NULL) == 0); + + PollMgr::Instance()->add_callback(fd_, CB_RDONLY, this); +} + +connection::~connection() +{ + VERIFY(dead_); + VERIFY(pthread_mutex_destroy(&m_)== 0); + VERIFY(pthread_mutex_destroy(&ref_m_)== 0); + VERIFY(pthread_cond_destroy(&send_wait_) == 0); + VERIFY(pthread_cond_destroy(&send_complete_) == 0); + if (rpdu_.buf) + free(rpdu_.buf); + VERIFY(!wpdu_.buf); + close(fd_); +} + +void +connection::incref() +{ + ScopedLock ml(&ref_m_); + refno_++; +} + +bool +connection::isdead() +{ + ScopedLock ml(&m_); + return dead_; +} + +void +connection::closeconn() +{ + { + ScopedLock ml(&m_); + if (!dead_) { + dead_ = true; + shutdown(fd_,SHUT_RDWR); + }else{ + return; + } + } + //after block_remove_fd, select will never wait on fd_ + //and no callbacks will be active + PollMgr::Instance()->block_remove_fd(fd_); +} + +void +connection::decref() +{ + VERIFY(pthread_mutex_lock(&ref_m_)==0); + refno_ --; + VERIFY(refno_>=0); + if (refno_==0) { + VERIFY(pthread_mutex_lock(&m_)==0); + if (dead_) { + VERIFY(pthread_mutex_unlock(&ref_m_)==0); + VERIFY(pthread_mutex_unlock(&m_)==0); + delete this; + return; + } + VERIFY(pthread_mutex_unlock(&m_)==0); + } + pthread_mutex_unlock(&ref_m_); +} + +int +connection::ref() +{ + ScopedLock rl(&ref_m_); + return refno_; +} + +int +connection::compare(connection *another) +{ + if (create_time_.tv_sec > another->create_time_.tv_sec) + return 1; + if (create_time_.tv_sec < another->create_time_.tv_sec) + return -1; + if (create_time_.tv_usec > another->create_time_.tv_usec) + return 1; + if (create_time_.tv_usec < another->create_time_.tv_usec) + return -1; + return 0; +} + +bool +connection::send(char *b, int sz) +{ + ScopedLock ml(&m_); + waiters_++; + while (!dead_ && wpdu_.buf) { + VERIFY(pthread_cond_wait(&send_wait_, &m_)==0); + } + waiters_--; + if (dead_) { + return false; + } + wpdu_.buf = b; + wpdu_.sz = sz; + wpdu_.solong = 0; + + if (lossy_) { + if ((random()%100) < lossy_) { + jsl_log(JSL_DBG_1, "connection::send LOSSY TEST shutdown fd_ %d\n", fd_); + shutdown(fd_,SHUT_RDWR); + } + } + + if (!writepdu()) { + dead_ = true; + VERIFY(pthread_mutex_unlock(&m_) == 0); + PollMgr::Instance()->block_remove_fd(fd_); + VERIFY(pthread_mutex_lock(&m_) == 0); + }else{ + if (wpdu_.solong == wpdu_.sz) { + }else{ + //should be rare to need to explicitly add write callback + PollMgr::Instance()->add_callback(fd_, CB_WRONLY, this); + while (!dead_ && wpdu_.solong >= 0 && wpdu_.solong < wpdu_.sz) { + VERIFY(pthread_cond_wait(&send_complete_,&m_) == 0); + } + } + } + bool ret = (!dead_ && wpdu_.solong == wpdu_.sz); + wpdu_.solong = wpdu_.sz = 0; + wpdu_.buf = NULL; + if (waiters_ > 0) + pthread_cond_broadcast(&send_wait_); + return ret; +} + +//fd_ is ready to be written +void +connection::write_cb(int s) +{ + ScopedLock ml(&m_); + VERIFY(!dead_); + VERIFY(fd_ == s); + if (wpdu_.sz == 0) { + PollMgr::Instance()->del_callback(fd_,CB_WRONLY); + return; + } + if (!writepdu()) { + PollMgr::Instance()->del_callback(fd_, CB_RDWR); + dead_ = true; + }else{ + VERIFY(wpdu_.solong >= 0); + if (wpdu_.solong < wpdu_.sz) { + return; + } + } + pthread_cond_signal(&send_complete_); +} + +//fd_ is ready to be read +void +connection::read_cb(int s) +{ + ScopedLock ml(&m_); + VERIFY(fd_ == s); + if (dead_) { + return; + } + + bool succ = true; + if (!rpdu_.buf || rpdu_.solong < rpdu_.sz) { + succ = readpdu(); + } + + if (!succ) { + PollMgr::Instance()->del_callback(fd_,CB_RDWR); + dead_ = true; + pthread_cond_signal(&send_complete_); + } + + if (rpdu_.buf && rpdu_.sz == rpdu_.solong) { + if (mgr_->got_pdu(this, rpdu_.buf, rpdu_.sz)) { + //chanmgr has successfully consumed the pdu + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + } + } +} + +bool +connection::writepdu() +{ + VERIFY(wpdu_.solong >= 0); + if (wpdu_.solong == wpdu_.sz) + return true; + + if (wpdu_.solong == 0) { + int sz = htonl(wpdu_.sz); + bcopy(&sz,wpdu_.buf,sizeof(sz)); + } + int n = write(fd_, wpdu_.buf + wpdu_.solong, (wpdu_.sz-wpdu_.solong)); + if (n < 0) { + if (errno != EAGAIN) { + jsl_log(JSL_DBG_1, "connection::writepdu fd_ %d failure errno=%d\n", fd_, errno); + wpdu_.solong = -1; + wpdu_.sz = 0; + } + return (errno == EAGAIN); + } + wpdu_.solong += n; + return true; +} + +bool +connection::readpdu() +{ + if (!rpdu_.sz) { + int sz, sz1; + int n = read(fd_, &sz1, sizeof(sz1)); + + if (n == 0) { + return false; + } + + if (n < 0) { + VERIFY(errno!=EAGAIN); + return false; + } + + if (n >0 && n!= sizeof(sz)) { + jsl_log(JSL_DBG_OFF, "connection::readpdu short read of sz\n"); + return false; + } + + sz = ntohl(sz1); + + if (sz > MAX_PDU) { + char *tmpb = (char *)&sz1; + jsl_log(JSL_DBG_2, "connection::readpdu read pdu TOO BIG %d network order=%x %x %x %x %x\n", sz, + sz1, tmpb[0],tmpb[1],tmpb[2],tmpb[3]); + return false; + } + + rpdu_.sz = sz; + VERIFY(rpdu_.buf == NULL); + rpdu_.buf = (char *)malloc(sz+sizeof(sz)); + VERIFY(rpdu_.buf); + bcopy(&sz1,rpdu_.buf,sizeof(sz)); + rpdu_.solong = sizeof(sz); + } + + int n = read(fd_, rpdu_.buf + rpdu_.solong, rpdu_.sz - rpdu_.solong); + if (n <= 0) { + if (errno == EAGAIN) + return true; + if (rpdu_.buf) + free(rpdu_.buf); + rpdu_.buf = NULL; + rpdu_.sz = rpdu_.solong = 0; + return (errno == EAGAIN); + } + rpdu_.solong += n; + return true; +} + +tcpsconn::tcpsconn(chanmgr *m1, int port, int lossytest) +: mgr_(m1), lossy_(lossytest) +{ + + VERIFY(pthread_mutex_init(&m_,NULL) == 0); + + struct sockaddr_in sin; + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + tcp_ = socket(AF_INET, SOCK_STREAM, 0); + if(tcp_ < 0){ + perror("tcpsconn::tcpsconn accept_loop socket:"); + VERIFY(0); + } + + int yes = 1; + setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + + if(bind(tcp_, (sockaddr *)&sin, sizeof(sin)) < 0){ + perror("accept_loop tcp bind:"); + VERIFY(0); + } + + if(listen(tcp_, 1000) < 0) { + perror("tcpsconn::tcpsconn listen:"); + VERIFY(0); + } + + jsl_log(JSL_DBG_2, "tcpsconn::tcpsconn listen on %d %d\n", port, + sin.sin_port); + + if (pipe(pipe_) < 0) { + perror("accept_loop pipe:"); + VERIFY(0); + } + + int flags = fcntl(pipe_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipe_[0], F_SETFL, flags); + + VERIFY((th_ = method_thread(this, false, &tcpsconn::accept_conn)) != 0); +} + +tcpsconn::~tcpsconn() +{ + VERIFY(close(pipe_[1]) == 0); + VERIFY(pthread_join(th_, NULL) == 0); + + //close all the active connections + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end(); i++) { + i->second->closeconn(); + i->second->decref(); + } +} + +void +tcpsconn::process_accept() +{ + sockaddr_in sin; + socklen_t slen = sizeof(sin); + int s1 = accept(tcp_, (sockaddr *)&sin, &slen); + if (s1 < 0) { + perror("tcpsconn::accept_conn error"); + pthread_exit(NULL); + } + + jsl_log(JSL_DBG_2, "accept_loop got connection fd=%d %s:%d\n", + s1, inet_ntoa(sin.sin_addr), ntohs(sin.sin_port)); + connection *ch = new connection(mgr_, s1, lossy_); + + // garbage collect all dead connections with refcount of 1 + std::map::iterator i; + for (i = conns_.begin(); i != conns_.end();) { + if (i->second->isdead() && i->second->ref() == 1) { + jsl_log(JSL_DBG_2, "accept_loop garbage collected fd=%d\n", + i->second->channo()); + i->second->decref(); + // Careful not to reuse i right after erase. (i++) will + // be evaluated before the erase call because in C++, + // there is a sequence point before a function call. + // See http://en.wikipedia.org/wiki/Sequence_point. + conns_.erase(i++); + } else + ++i; + } + + conns_[ch->channo()] = ch; +} + +void +tcpsconn::accept_conn() +{ + fd_set rfds; + int max_fd = pipe_[0] > tcp_ ? pipe_[0] : tcp_; + + while (1) { + FD_ZERO(&rfds); + FD_SET(pipe_[0], &rfds); + FD_SET(tcp_, &rfds); + + int ret = select(max_fd+1, &rfds, NULL, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + continue; + } else { + perror("accept_conn select:"); + jsl_log(JSL_DBG_OFF, "tcpsconn::accept_conn failure errno %d\n",errno); + VERIFY(0); + } + } + + if (FD_ISSET(pipe_[0], &rfds)) { + close(pipe_[0]); + close(tcp_); + return; + } + else if (FD_ISSET(tcp_, &rfds)) { + process_accept(); + } else { + VERIFY(0); + } + } +} + +connection * +connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy) +{ + int s= socket(AF_INET, SOCK_STREAM, 0); + int yes = 1; + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)); + if(connect(s, (sockaddr*)&dst, sizeof(dst)) < 0) { + jsl_log(JSL_DBG_1, "rpcc::connect_to_dst failed to %s:%d\n", + inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); + close(s); + return NULL; + } + jsl_log(JSL_DBG_2, "connect_to_dst fd=%d to dst %s:%d\n", + s, inet_ntoa(dst.sin_addr), (int)ntohs(dst.sin_port)); + return new connection(mgr, s, lossy); +} + + diff --git a/rpc/connection.h b/rpc/connection.h new file mode 100644 index 0000000..2c84daf --- /dev/null +++ b/rpc/connection.h @@ -0,0 +1,101 @@ +#ifndef connection_h +#define connection_h 1 + +#include +#include +#include +#include +#include + +#include + +#include "pollmgr.h" + +class connection; + +class chanmgr { + public: + virtual bool got_pdu(connection *c, char *b, int sz) = 0; + virtual ~chanmgr() {} +}; + +class connection : public aio_callback { + public: + struct charbuf { + charbuf(): buf(NULL), sz(0), solong(0) {} + charbuf (char *b, int s) : buf(b), sz(s), solong(0){} + char *buf; + int sz; + int solong; //amount of bytes written or read so far + }; + + connection(chanmgr *m1, int f1, int lossytest=0); + ~connection(); + + int channo() { return fd_; } + bool isdead(); + void closeconn(); + + bool send(char *b, int sz); + void write_cb(int s); + void read_cb(int s); + + void incref(); + void decref(); + int ref(); + + int compare(connection *another); + private: + + bool readpdu(); + bool writepdu(); + + chanmgr *mgr_; + const int fd_; + bool dead_; + + charbuf wpdu_; + charbuf rpdu_; + + struct timeval create_time_; + + int waiters_; + int refno_; + const int lossy_; + + pthread_mutex_t m_; + pthread_mutex_t ref_m_; + pthread_cond_t send_complete_; + pthread_cond_t send_wait_; +}; + +class tcpsconn { + public: + tcpsconn(chanmgr *m1, int port, int lossytest=0); + ~tcpsconn(); + + void accept_conn(); + private: + + pthread_mutex_t m_; + pthread_t th_; + int pipe_[2]; + + int tcp_; //file desciptor for accepting connection + chanmgr *mgr_; + int lossy_; + std::map conns_; + + void process_accept(); +}; + +struct bundle { + bundle(chanmgr *m, int s, int l):mgr(m),tcp(s),lossy(l) {} + chanmgr *mgr; + int tcp; + int lossy; +}; + +void start_accept_thread(chanmgr *mgr, int port, pthread_t *th, int *fd = NULL, int lossy=0); +connection *connect_to_dst(const sockaddr_in &dst, chanmgr *mgr, int lossy=0); +#endif diff --git a/rpc/fifo.h b/rpc/fifo.h new file mode 100644 index 0000000..979cf62 --- /dev/null +++ b/rpc/fifo.h @@ -0,0 +1,94 @@ +#ifndef fifo_h +#define fifo_h + +// fifo template +// blocks enq() and deq() when queue is FULL or EMPTY + +#include +#include +#include +#include +#include +#include "slock.h" +#include "lang/verify.h" + +template +class fifo { + public: + fifo(int m=0); + ~fifo(); + bool enq(T, bool blocking=true); + void deq(T *); + bool size(); + + private: + std::list q_; + pthread_mutex_t m_; + pthread_cond_t non_empty_c_; // q went non-empty + pthread_cond_t has_space_c_; // q is not longer overfull + unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit +}; + +template +fifo::fifo(int limit) : max_(limit) +{ + VERIFY(pthread_mutex_init(&m_, 0) == 0); + VERIFY(pthread_cond_init(&non_empty_c_, 0) == 0); + VERIFY(pthread_cond_init(&has_space_c_, 0) == 0); +} + +template +fifo::~fifo() +{ + //fifo is to be deleted only when no threads are using it! + VERIFY(pthread_mutex_destroy(&m_)==0); + VERIFY(pthread_cond_destroy(&non_empty_c_) == 0); + VERIFY(pthread_cond_destroy(&has_space_c_) == 0); +} + +template bool +fifo::size() +{ + ScopedLock ml(&m_); + return q_.size(); +} + +template bool +fifo::enq(T e, bool blocking) +{ + ScopedLock ml(&m_); + while (1) { + if (!max_ || q_.size() < max_) { + q_.push_back(e); + break; + } + if (blocking) + VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0); + else + return false; + } + VERIFY(pthread_cond_signal(&non_empty_c_) == 0); + return true; +} + +template void +fifo::deq(T *e) +{ + ScopedLock ml(&m_); + + while(1) { + if(q_.empty()){ + VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0); + } else { + *e = q_.front(); + q_.pop_front(); + if (max_ && q_.size() < max_) { + VERIFY(pthread_cond_signal(&has_space_c_)==0); + } + break; + } + } + return; +} + +#endif diff --git a/rpc/jsl_log.cc b/rpc/jsl_log.cc new file mode 100644 index 0000000..06e5c2c --- /dev/null +++ b/rpc/jsl_log.cc @@ -0,0 +1,9 @@ +#include "jsl_log.h" + +int JSL_DEBUG_LEVEL = 0; +void +jsl_set_debug(int level) { + JSL_DEBUG_LEVEL = level; +} + + diff --git a/rpc/jsl_log.h b/rpc/jsl_log.h new file mode 100644 index 0000000..7f92998 --- /dev/null +++ b/rpc/jsl_log.h @@ -0,0 +1,25 @@ +#ifndef __JSL_LOG_H__ +#define __JSL_LOG_H__ 1 + +enum dbcode { + JSL_DBG_OFF = 0, + JSL_DBG_1 = 1, // Critical + JSL_DBG_2 = 2, // Error + JSL_DBG_3 = 3, // Info + JSL_DBG_4 = 4, // Debugging +}; + +extern int JSL_DEBUG_LEVEL; + +#define jsl_log(level,...) \ + do { \ + if(JSL_DEBUG_LEVEL < abs(level)) \ + {;} \ + else { \ + { printf(__VA_ARGS__);} \ + } \ + } while(0) + +void jsl_set_debug(int level); + +#endif // __JSL_LOG_H__ diff --git a/rpc/marshall.h b/rpc/marshall.h new file mode 100644 index 0000000..e0370d1 --- /dev/null +++ b/rpc/marshall.h @@ -0,0 +1,261 @@ +#ifndef marshall_h +#define marshall_h + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "lang/verify.h" +#include "lang/algorithm.h" + +struct req_header { + req_header(int x=0, int p=0, int c = 0, int s = 0, int xi = 0): + xid(x), proc(p), clt_nonce(c), srv_nonce(s), xid_rep(xi) {} + int xid; + int proc; + unsigned int clt_nonce; + unsigned int srv_nonce; + int xid_rep; +}; + +struct reply_header { + reply_header(int x=0, int r=0): xid(x), ret(r) {} + int xid; + int ret; +}; + +typedef uint64_t rpc_checksum_t; +typedef int rpc_sz_t; + +enum { + //size of initial buffer allocation + DEFAULT_RPC_SZ = 1024, +#if RPC_CHECKSUMMING + //size of rpc_header includes a 4-byte int to be filled by tcpchan and uint64_t checksum + RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) + sizeof(rpc_checksum_t) +#else + RPC_HEADER_SZ = static_max::value + sizeof(rpc_sz_t) +#endif +}; + +class marshall { + private: + char *_buf; // Base of the raw bytes buffer (dynamically readjusted) + int _capa; // Capacity of the buffer + int _ind; // Read/write head position + + public: + marshall() { + _buf = (char *) malloc(sizeof(char)*DEFAULT_RPC_SZ); + VERIFY(_buf); + _capa = DEFAULT_RPC_SZ; + _ind = RPC_HEADER_SZ; + } + + ~marshall() { + if (_buf) + free(_buf); + } + + int size() { return _ind;} + char *cstr() { return _buf;} + + void rawbyte(unsigned char); + void rawbytes(const char *, int); + + // Return the current content (excluding header) as a string + std::string get_content() { + return std::string(_buf+RPC_HEADER_SZ,_ind-RPC_HEADER_SZ); + } + + // Return the current content (excluding header) as a string + std::string str() { + return get_content(); + } + + void pack(int i); + + void pack_req_header(const req_header &h) { + int saved_sz = _ind; + //leave the first 4-byte empty for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + pack(h.xid); + pack(h.proc); + pack((int)h.clt_nonce); + pack((int)h.srv_nonce); + pack(h.xid_rep); + _ind = saved_sz; + } + + void pack_reply_header(const reply_header &h) { + int saved_sz = _ind; + //leave the first 4-byte empty for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + pack(h.xid); + pack(h.ret); + _ind = saved_sz; + } + + void take_buf(char **b, int *s) { + *b = _buf; + *s = _ind; + _buf = NULL; + _ind = 0; + return; + } +}; +marshall& operator<<(marshall &, bool); +marshall& operator<<(marshall &, unsigned int); +marshall& operator<<(marshall &, int); +marshall& operator<<(marshall &, unsigned char); +marshall& operator<<(marshall &, char); +marshall& operator<<(marshall &, unsigned short); +marshall& operator<<(marshall &, short); +marshall& operator<<(marshall &, unsigned long long); +marshall& operator<<(marshall &, const std::string &); + +class unmarshall { + private: + char *_buf; + int _sz; + int _ind; + bool _ok; + public: + unmarshall(): _buf(NULL),_sz(0),_ind(0),_ok(false) {} + unmarshall(char *b, int sz): _buf(b),_sz(sz),_ind(),_ok(true) {} + unmarshall(const std::string &s) : _buf(NULL),_sz(0),_ind(0),_ok(false) + { + //take the content which does not exclude a RPC header from a string + take_content(s); + } + ~unmarshall() { + if (_buf) free(_buf); + } + + //take contents from another unmarshall object + void take_in(unmarshall &another); + + //take the content which does not exclude a RPC header from a string + void take_content(const std::string &s) { + _sz = s.size()+RPC_HEADER_SZ; + _buf = (char *)realloc(_buf,_sz); + VERIFY(_buf); + _ind = RPC_HEADER_SZ; + memcpy(_buf+_ind, s.data(), s.size()); + _ok = true; + } + + bool ok() { return _ok; } + char *cstr() { return _buf;} + bool okdone(); + unsigned int rawbyte(); + void rawbytes(std::string &s, unsigned int n); + + int ind() { return _ind;} + int size() { return _sz;} + void unpack(int *); //non-const ref + void take_buf(char **b, int *sz) { + *b = _buf; + *sz = _sz; + _sz = _ind = 0; + _buf = NULL; + } + + void unpack_req_header(req_header *h) { + //the first 4-byte is for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + unpack(&h->xid); + unpack(&h->proc); + unpack((int *)&h->clt_nonce); + unpack((int *)&h->srv_nonce); + unpack(&h->xid_rep); + _ind = RPC_HEADER_SZ; + } + + void unpack_reply_header(reply_header *h) { + //the first 4-byte is for channel to fill size of pdu + _ind = sizeof(rpc_sz_t); +#if RPC_CHECKSUMMING + _ind += sizeof(rpc_checksum_t); +#endif + unpack(&h->xid); + unpack(&h->ret); + _ind = RPC_HEADER_SZ; + } +}; + +unmarshall& operator>>(unmarshall &, bool &); +unmarshall& operator>>(unmarshall &, unsigned char &); +unmarshall& operator>>(unmarshall &, char &); +unmarshall& operator>>(unmarshall &, unsigned short &); +unmarshall& operator>>(unmarshall &, short &); +unmarshall& operator>>(unmarshall &, unsigned int &); +unmarshall& operator>>(unmarshall &, int &); +unmarshall& operator>>(unmarshall &, unsigned long long &); +unmarshall& operator>>(unmarshall &, std::string &); + +template marshall & +operator<<(marshall &m, std::vector v) +{ + m << (unsigned int) v.size(); + for(unsigned i = 0; i < v.size(); i++) + m << v[i]; + return m; +} + +template unmarshall & +operator>>(unmarshall &u, std::vector &v) +{ + unsigned n; + u >> n; + for(unsigned i = 0; i < n; i++){ + C z; + u >> z; + v.push_back(z); + } + return u; +} + +template marshall & +operator<<(marshall &m, const std::map &d) { + typename std::map::const_iterator i; + + m << (unsigned int) d.size(); + + for (i = d.begin(); i != d.end(); i++) { + m << i->first << i->second; + } + return m; +} + +template unmarshall & +operator>>(unmarshall &u, std::map &d) { + unsigned int n; + u >> n; + + d.clear(); + + for (unsigned int lcv = 0; lcv < n; lcv++) { + A a; + B b; + u >> a >> b; + d[a] = b; + } + return u; +} + +#endif diff --git a/rpc/method_thread.h b/rpc/method_thread.h new file mode 100644 index 0000000..bcbc08b --- /dev/null +++ b/rpc/method_thread.h @@ -0,0 +1,164 @@ +#ifndef method_thread_h +#define method_thread_h + +// method_thread(): start a thread that runs an object method. +// returns a pthread_t on success, and zero on error. + +#include +#include +#include +#include +#include "lang/verify.h" + +static pthread_t +method_thread_parent(void *(*fn)(void *), void *arg, bool detach) +{ + pthread_t th; + pthread_attr_t attr; + pthread_attr_init(&attr); + // set stack size to 100K, so we don't run out of memory + pthread_attr_setstacksize(&attr, 100*1024); + int err = pthread_create(&th, &attr, fn, arg); + pthread_attr_destroy(&attr); + if (err != 0) { + fprintf(stderr, "pthread_create ret %d %s\n", err, strerror(err)); + exit(1); + } + + if (detach) { + // don't keep thread state around after exit, to avoid + // running out of threads. set detach==false if you plan + // to pthread_join. + VERIFY(pthread_detach(th) == 0); + } + + return th; +} + +static void +method_thread_child() +{ + // defer pthread_cancel() by default. check explicitly by + // enabling then pthread_testcancel(). + int oldstate, oldtype; + VERIFY(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) == 0); + VERIFY(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &oldtype) == 0); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)()) +{ + class XXX { + public: + C *o; + void (C::*m)(); + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)() = x->m; + delete x; + method_thread_child(); + (o->*m)(); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A), A a) +{ + class XXX { + public: + C *o; + void (C::*m)(A a); + A a; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + delete x; + method_thread_child(); + (o->*m)(a); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a = a; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +namespace { + // ~xavid: this causes a bizzare compile error on OS X.5 when + // it's declared in the function, so I moved it out here. + template + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2); + A1 a1; + A2 a2; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + delete x; + method_thread_child(); + (o->*m)(a1, a2); + return 0; + } + }; +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A1 , A2 ), A1 a1, A2 a2) +{ + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +template pthread_t +method_thread(C *o, bool detach, void (C::*m)(A1 , A2, A3 ), A1 a1, A2 a2, A3 a3) +{ + class XXX { + public: + C *o; + void (C::*m)(A1 a1, A2 a2, A3 a3); + A1 a1; + A2 a2; + A3 a3; + static void *yyy(void *vvv) { + XXX *x = (XXX*)vvv; + C *o = x->o; + void (C::*m)(A1 , A2 , A3 ) = x->m; + A1 a1 = x->a1; + A2 a2 = x->a2; + A3 a3 = x->a3; + delete x; + method_thread_child(); + (o->*m)(a1, a2, a3); + return 0; + } + }; + XXX *x = new XXX; + x->o = o; + x->m = m; + x->a1 = a1; + x->a2 = a2; + x->a3 = a3; + return method_thread_parent(&XXX::yyy, (void *) x, detach); +} + +#endif diff --git a/rpc/pollmgr.cc b/rpc/pollmgr.cc new file mode 100644 index 0000000..f73a3a5 --- /dev/null +++ b/rpc/pollmgr.cc @@ -0,0 +1,360 @@ +#include +#include +#include +#include + +#include "slock.h" +#include "jsl_log.h" +#include "method_thread.h" +#include "lang/verify.h" +#include "pollmgr.h" + +PollMgr *PollMgr::instance = NULL; +static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT; + +void +PollMgrInit() +{ + PollMgr::instance = new PollMgr(); +} + +PollMgr * +PollMgr::Instance() +{ + pthread_once(&pollmgr_is_initialized, PollMgrInit); + return instance; +} + +PollMgr::PollMgr() : pending_change_(false) +{ + bzero(callbacks_, MAX_POLL_FDS*sizeof(void *)); + aio_ = new SelectAIO(); + //aio_ = new EPollAIO(); + + VERIFY(pthread_mutex_init(&m_, NULL) == 0); + VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0); + VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0); +} + +PollMgr::~PollMgr() +{ + //never kill me!!! + VERIFY(0); +} + +void +PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch) +{ + VERIFY(fd < MAX_POLL_FDS); + + ScopedLock ml(&m_); + aio_->watch_fd(fd, flag); + + VERIFY(!callbacks_[fd] || callbacks_[fd]==ch); + callbacks_[fd] = ch; +} + +//remove all callbacks related to fd +//the return guarantees that callbacks related to fd +//will never be called again +void +PollMgr::block_remove_fd(int fd) +{ + ScopedLock ml(&m_); + aio_->unwatch_fd(fd, CB_RDWR); + pending_change_ = true; + VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0); + callbacks_[fd] = NULL; +} + +void +PollMgr::del_callback(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (aio_->unwatch_fd(fd, flag)) { + callbacks_[fd] = NULL; + } +} + +bool +PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c) +{ + ScopedLock ml(&m_); + if (!callbacks_[fd] || callbacks_[fd]!=c) + return false; + + return aio_->is_watched(fd, flag); +} + +void +PollMgr::wait_loop() +{ + + std::vector readable; + std::vector writable; + + while (1) { + { + ScopedLock ml(&m_); + if (pending_change_) { + pending_change_ = false; + VERIFY(pthread_cond_broadcast(&changedone_c_)==0); + } + } + readable.clear(); + writable.clear(); + aio_->wait_ready(&readable,&writable); + + if (!readable.size() && !writable.size()) { + continue; + } + //no locking of m_ + //because no add_callback() and del_callback should + //modify callbacks_[fd] while the fd is not dead + for (unsigned int i = 0; i < readable.size(); i++) { + int fd = readable[i]; + if (callbacks_[fd]) + callbacks_[fd]->read_cb(fd); + } + + for (unsigned int i = 0; i < writable.size(); i++) { + int fd = writable[i]; + if (callbacks_[fd]) + callbacks_[fd]->write_cb(fd); + } + } +} + +SelectAIO::SelectAIO() : highfds_(0) +{ + FD_ZERO(&rfds_); + FD_ZERO(&wfds_); + + VERIFY(pipe(pipefd_) == 0); + FD_SET(pipefd_[0], &rfds_); + highfds_ = pipefd_[0]; + + int flags = fcntl(pipefd_[0], F_GETFL, NULL); + flags |= O_NONBLOCK; + fcntl(pipefd_[0], F_SETFL, flags); + + VERIFY(pthread_mutex_init(&m_, NULL) == 0); +} + +SelectAIO::~SelectAIO() +{ + VERIFY(pthread_mutex_destroy(&m_) == 0); +} + +void +SelectAIO::watch_fd(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (highfds_ <= fd) + highfds_ = fd; + + if (flag == CB_RDONLY) { + FD_SET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + FD_SET(fd,&wfds_); + }else { + FD_SET(fd,&rfds_); + FD_SET(fd,&wfds_); + } + + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); +} + +bool +SelectAIO::is_watched(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (flag == CB_RDONLY) { + return FD_ISSET(fd,&rfds_); + }else if (flag == CB_WRONLY) { + return FD_ISSET(fd,&wfds_); + }else{ + return (FD_ISSET(fd,&rfds_) && FD_ISSET(fd,&wfds_)); + } +} + +bool +SelectAIO::unwatch_fd(int fd, poll_flag flag) +{ + ScopedLock ml(&m_); + if (flag == CB_RDONLY) { + FD_CLR(fd, &rfds_); + }else if (flag == CB_WRONLY) { + FD_CLR(fd, &wfds_); + }else if (flag == CB_RDWR) { + FD_CLR(fd, &wfds_); + FD_CLR(fd, &rfds_); + }else{ + VERIFY(0); + } + + if (!FD_ISSET(fd,&rfds_) && !FD_ISSET(fd,&wfds_)) { + if (fd == highfds_) { + int newh = pipefd_[0]; + for (int i = 0; i <= highfds_; i++) { + if (FD_ISSET(i, &rfds_)) { + newh = i; + }else if (FD_ISSET(i, &wfds_)) { + newh = i; + } + } + highfds_ = newh; + } + } + if (flag == CB_RDWR) { + char tmp = 1; + VERIFY(write(pipefd_[1], &tmp, sizeof(tmp))==1); + } + return (!FD_ISSET(fd, &rfds_) && !FD_ISSET(fd, &wfds_)); +} + +void +SelectAIO::wait_ready(std::vector *readable, std::vector *writable) +{ + fd_set trfds, twfds; + int high; + + { + ScopedLock ml(&m_); + trfds = rfds_; + twfds = wfds_; + high = highfds_; + + } + + int ret = select(high+1, &trfds, &twfds, NULL, NULL); + + if (ret < 0) { + if (errno == EINTR) { + return; + } else { + perror("select:"); + jsl_log(JSL_DBG_OFF, "PollMgr::select_loop failure errno %d\n",errno); + VERIFY(0); + } + } + + for (int fd = 0; fd <= high; fd++) { + if (fd == pipefd_[0] && FD_ISSET(fd, &trfds)) { + char tmp; + VERIFY (read(pipefd_[0],&tmp,sizeof(tmp))==1); + VERIFY(tmp==1); + }else { + if (FD_ISSET(fd, &twfds)) { + writable->push_back(fd); + } + if (FD_ISSET(fd, &trfds)) { + readable->push_back(fd); + } + } + } +} + +#ifdef __linux__ + +EPollAIO::EPollAIO() +{ + pollfd_ = epoll_create(MAX_POLL_FDS); + VERIFY(pollfd_ >= 0); + bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS); +} + +EPollAIO::~EPollAIO() +{ + close(pollfd_); +} + +static inline +int poll_flag_to_event(poll_flag flag) +{ + int f; + if (flag == CB_RDONLY) { + f = EPOLLIN; + }else if (flag == CB_WRONLY) { + f = EPOLLOUT; + }else { //flag == CB_RDWR + f = EPOLLIN | EPOLLOUT; + } + return f; +} + +void +EPollAIO::watch_fd(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD; + fdstatus_[fd] |= (int)flag; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT)); + } + + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); +} + +bool +EPollAIO::unwatch_fd(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + fdstatus_[fd] &= ~(int)flag; + + struct epoll_event ev; + int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + + ev.events = EPOLLET; + ev.data.fd = fd; + + if (fdstatus_[fd] & CB_RDONLY) { + ev.events |= EPOLLIN; + } + if (fdstatus_[fd] & CB_WRONLY) { + ev.events |= EPOLLOUT; + } + + if (flag == CB_RDWR) { + VERIFY(op == EPOLL_CTL_DEL); + } + VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0); + return (op == EPOLL_CTL_DEL); +} + +bool +EPollAIO::is_watched(int fd, poll_flag flag) +{ + VERIFY(fd < MAX_POLL_FDS); + return ((fdstatus_[fd] & CB_MASK) == flag); +} + +void +EPollAIO::wait_ready(std::vector *readable, std::vector *writable) +{ + int nfds = epoll_wait(pollfd_, ready_, MAX_POLL_FDS, -1); + for (int i = 0; i < nfds; i++) { + if (ready_[i].events & EPOLLIN) { + readable->push_back(ready_[i].data.fd); + } + if (ready_[i].events & EPOLLOUT) { + writable->push_back(ready_[i].data.fd); + } + } +} + +#endif diff --git a/rpc/pollmgr.h b/rpc/pollmgr.h new file mode 100644 index 0000000..c0a5748 --- /dev/null +++ b/rpc/pollmgr.h @@ -0,0 +1,107 @@ +#ifndef pollmgr_h +#define pollmgr_h + +#include +#include + +#ifdef __linux__ +#include +#endif + +#define MAX_POLL_FDS 128 + +typedef enum { + CB_NONE = 0x0, + CB_RDONLY = 0x1, + CB_WRONLY = 0x10, + CB_RDWR = 0x11, + CB_MASK = ~0x11, +} poll_flag; + +class aio_mgr { + public: + virtual void watch_fd(int fd, poll_flag flag) = 0; + virtual bool unwatch_fd(int fd, poll_flag flag) = 0; + virtual bool is_watched(int fd, poll_flag flag) = 0; + virtual void wait_ready(std::vector *readable, std::vector *writable) = 0; + virtual ~aio_mgr() {} +}; + +class aio_callback { + public: + virtual void read_cb(int fd) = 0; + virtual void write_cb(int fd) = 0; + virtual ~aio_callback() {} +}; + +class PollMgr { + public: + PollMgr(); + ~PollMgr(); + + static PollMgr *Instance(); + static PollMgr *CreateInst(); + + void add_callback(int fd, poll_flag flag, aio_callback *ch); + void del_callback(int fd, poll_flag flag); + bool has_callback(int fd, poll_flag flag, aio_callback *ch); + void block_remove_fd(int fd); + void wait_loop(); + + + static PollMgr *instance; + static int useful; + static int useless; + + private: + pthread_mutex_t m_; + pthread_cond_t changedone_c_; + pthread_t th_; + + aio_callback *callbacks_[MAX_POLL_FDS]; + aio_mgr *aio_; + bool pending_change_; + +}; + +class SelectAIO : public aio_mgr { + public : + + SelectAIO(); + ~SelectAIO(); + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + bool is_watched(int fd, poll_flag flag); + void wait_ready(std::vector *readable, std::vector *writable); + + private: + + fd_set rfds_; + fd_set wfds_; + int highfds_; + int pipefd_[2]; + + pthread_mutex_t m_; + +}; + +#ifdef __linux__ +class EPollAIO : public aio_mgr { + public: + EPollAIO(); + ~EPollAIO(); + void watch_fd(int fd, poll_flag flag); + bool unwatch_fd(int fd, poll_flag flag); + bool is_watched(int fd, poll_flag flag); + void wait_ready(std::vector *readable, std::vector *writable); + + private: + int pollfd_; + struct epoll_event ready_[MAX_POLL_FDS]; + int fdstatus_[MAX_POLL_FDS]; + +}; +#endif /* __linux */ + +#endif /* pollmgr_h */ + diff --git a/rpc/rpc.cc b/rpc/rpc.cc new file mode 100644 index 0000000..2cebacf --- /dev/null +++ b/rpc/rpc.cc @@ -0,0 +1,1081 @@ +/* + The rpcc class handles client-side RPC. Each rpcc is bound to a + single RPC server. The jobs of rpcc include maintaining a connection to + server, sending RPC requests and waiting for responses, retransmissions, + at-most-once delivery etc. + + The rpcs class handles the server side of RPC. Each rpcs handles multiple + connections from different rpcc objects. The jobs of rpcs include accepting + connections, dispatching requests to registered RPC handlers, at-most-once + delivery etc. + + Both rpcc and rpcs use the connection class as an abstraction for the + underlying communication channel. To send an RPC request/reply, one calls + connection::send() which blocks until data is sent or the connection has failed + (thus the caller can free the buffer when send() returns). When a + request/reply is received, connection makes a callback into the corresponding + rpcc or rpcs (see rpcc::got_pdu() and rpcs::got_pdu()). + + Thread organization: + rpcc uses application threads to send RPC requests and blocks to receive the + reply or error. All connections use a single PollMgr object to perform async + socket IO. PollMgr creates a single thread to examine the readiness of socket + file descriptors and informs the corresponding connection whenever a socket is + ready to be read or written. (We use asynchronous socket IO to reduce the + number of threads needed to manage these connections; without async IO, at + least one thread is needed per connection to read data without blocking other + activities.) Each rpcs object creates one thread for listening on the server + port and a pool of threads for executing RPC requests. The + thread pool allows us to control the number of threads spawned at the server + (spawning one thread per request will hurt when the server faces thousands of + requests). + + In order to delete a connection object, we must maintain a reference count. + For rpcc, + multiple client threads might be invoking the rpcc::call() functions and thus + holding multiple references to the underlying connection object. For rpcs, + multiple dispatch threads might be holding references to the same connection + object. A connection object is deleted only when the underlying connection is + dead and the reference count reaches zero. + + The previous version of the RPC library uses pthread_cancel* routines + to implement the deletion of rpcc and rpcs objects. The idea is to cancel + all active threads that might be holding a reference to an object before + deleting that object. However, pthread_cancel is not robust and there are + always bugs where outstanding references to deleted objects persist. + This version of the RPC library does not do pthread_cancel, but explicitly + joins exited threads to make sure no outstanding references exist before + deleting objects. + + To delete a rpcc object safely, the users of the library must ensure that + there are no outstanding calls on the rpcc object. + + To delete a rpcs object safely, we do the following in sequence: 1. stop + accepting new incoming connections. 2. close existing active connections. + 3. delete the dispatch thread pool which involves waiting for current active + RPC handlers to finish. It is interesting how a thread pool can be deleted + without using thread cancellation. The trick is to inject x "poison pills" for + a thread pool of x threads. Upon getting a poison pill instead of a normal + task, a worker thread will exit (and thread pool destructor waits to join all + x exited worker threads). + */ + +#include "rpc.h" +#include "method_thread.h" +#include "slock.h" + +#include +#include +#include +#include +#include + +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +const rpcc::TO rpcc::to_max = { 120000 }; +const rpcc::TO rpcc::to_min = { 1000 }; + +rpcc::caller::caller(unsigned int xxid, unmarshall *xun) +: xid(xxid), un(xun), done(false) +{ + VERIFY(pthread_mutex_init(&m,0) == 0); + VERIFY(pthread_cond_init(&c, 0) == 0); +} + +rpcc::caller::~caller() +{ + VERIFY(pthread_mutex_destroy(&m) == 0); + VERIFY(pthread_cond_destroy(&c) == 0); +} + +inline +void set_rand_seed() +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + srandom((int)ts.tv_nsec^((int)getpid())); +} + +rpcc::rpcc(sockaddr_in d, bool retrans) : + dst_(d), srv_nonce_(0), bind_done_(false), xid_(1), lossytest_(0), + retrans_(retrans), reachable_(true), chan_(NULL), destroy_wait_ (false), xid_rep_done_(-1) +{ + VERIFY(pthread_mutex_init(&m_, 0) == 0); + VERIFY(pthread_mutex_init(&chan_m_, 0) == 0); + VERIFY(pthread_cond_init(&destroy_wait_c_, 0) == 0); + + if(retrans){ + set_rand_seed(); + clt_nonce_ = random(); + } else { + // special client nonce 0 means this client does not + // require at-most-once logic from the server + // because it uses tcp and never retries a failed connection + clt_nonce_ = 0; + } + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + // xid starts with 1 and latest received reply starts with 0 + xid_rep_window_.push_back(0); + + jsl_log(JSL_DBG_2, "rpcc::rpcc cltn_nonce is %d lossy %d\n", + clt_nonce_, lossytest_); +} + +// IMPORTANT: destruction should happen only when no external threads +// are blocked inside rpcc or will use rpcc in the future +rpcc::~rpcc() +{ + jsl_log(JSL_DBG_2, "rpcc::~rpcc delete nonce %d channo=%d\n", + clt_nonce_, chan_?chan_->channo():-1); + if(chan_){ + chan_->closeconn(); + chan_->decref(); + } + VERIFY(calls_.size() == 0); + VERIFY(pthread_mutex_destroy(&m_) == 0); + VERIFY(pthread_mutex_destroy(&chan_m_) == 0); +} + +int +rpcc::bind(TO to) +{ + int r; + int ret = call(rpc_const::bind, 0, r, to); + if(ret == 0){ + ScopedLock ml(&m_); + bind_done_ = true; + srv_nonce_ = r; + } else { + jsl_log(JSL_DBG_2, "rpcc::bind %s failed %d\n", + inet_ntoa(dst_.sin_addr), ret); + } + return ret; +}; + +// Cancel all outstanding calls +void +rpcc::cancel(void) +{ + ScopedLock ml(&m_); + printf("rpcc::cancel: force callers to fail\n"); + std::map::iterator iter; + for(iter = calls_.begin(); iter != calls_.end(); iter++){ + caller *ca = iter->second; + + jsl_log(JSL_DBG_2, "rpcc::cancel: force caller to fail\n"); + { + ScopedLock cl(&ca->m); + ca->done = true; + ca->intret = rpc_const::cancel_failure; + VERIFY(pthread_cond_signal(&ca->c) == 0); + } + } + + while (calls_.size () > 0){ + destroy_wait_ = true; + VERIFY(pthread_cond_wait(&destroy_wait_c_,&m_) == 0); + } + printf("rpcc::cancel: done\n"); +} + +int +rpcc::call1(unsigned int proc, marshall &req, unmarshall &rep, + TO to) +{ + + caller ca(0, &rep); + int xid_rep; + { + ScopedLock ml(&m_); + + if((proc != rpc_const::bind && !bind_done_) || + (proc == rpc_const::bind && bind_done_)){ + jsl_log(JSL_DBG_1, "rpcc::call1 rpcc has not been bound to dst or binding twice\n"); + return rpc_const::bind_failure; + } + + if(destroy_wait_){ + return rpc_const::cancel_failure; + } + + ca.xid = xid_++; + calls_[ca.xid] = &ca; + + req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, + xid_rep_window_.front()); + req.pack_req_header(h); + xid_rep = xid_rep_window_.front(); + } + + TO curr_to; + struct timespec now, nextdeadline, finaldeadline; + + clock_gettime(CLOCK_REALTIME, &now); + add_timespec(now, to.to, &finaldeadline); + curr_to.to = to_min.to; + + bool transmit = true; + connection *ch = NULL; + + while (1){ + if(transmit){ + get_refconn(&ch); + if(ch){ + if(reachable_) { + request forgot; + { + ScopedLock ml(&m_); + if (dup_req_.isvalid() && xid_rep_done_ > dup_req_.xid) { + forgot = dup_req_; + dup_req_.clear(); + } + } + if (forgot.isvalid()) + ch->send((char *)forgot.buf.c_str(), forgot.buf.size()); + ch->send(req.cstr(), req.size()); + } + else jsl_log(JSL_DBG_1, "not reachable\n"); + jsl_log(JSL_DBG_2, + "rpcc::call1 %u just sent req proc %x xid %u clt_nonce %d\n", + clt_nonce_, proc, ca.xid, clt_nonce_); + } + transmit = false; // only send once on a given channel + } + + if(!finaldeadline.tv_sec) + break; + + clock_gettime(CLOCK_REALTIME, &now); + add_timespec(now, curr_to.to, &nextdeadline); + if(cmp_timespec(nextdeadline,finaldeadline) > 0){ + nextdeadline = finaldeadline; + finaldeadline.tv_sec = 0; + } + + { + ScopedLock cal(&ca.m); + while (!ca.done){ + jsl_log(JSL_DBG_2, "rpcc:call1: wait\n"); + if(pthread_cond_timedwait(&ca.c, &ca.m, + &nextdeadline) == ETIMEDOUT){ + jsl_log(JSL_DBG_2, "rpcc::call1: timeout\n"); + break; + } + } + if(ca.done){ + jsl_log(JSL_DBG_2, "rpcc::call1: reply received\n"); + break; + } + } + + if(retrans_ && (!ch || ch->isdead())){ + // since connection is dead, retransmit + // on the new connection + transmit = true; + } + curr_to.to <<= 1; + } + + { + // no locking of ca.m since only this thread changes ca.xid + ScopedLock ml(&m_); + calls_.erase(ca.xid); + // may need to update the xid again here, in case the + // packet times out before it's even sent by the channel. + // I don't think there's any harm in maybe doing it twice + update_xid_rep(ca.xid); + + if(destroy_wait_){ + VERIFY(pthread_cond_signal(&destroy_wait_c_) == 0); + } + } + + if (ca.done && lossytest_) + { + ScopedLock ml(&m_); + if (!dup_req_.isvalid()) { + dup_req_.buf.assign(req.cstr(), req.size()); + dup_req_.xid = ca.xid; + } + if (xid_rep > xid_rep_done_) + xid_rep_done_ = xid_rep; + } + + ScopedLock cal(&ca.m); + + jsl_log(JSL_DBG_2, + "rpcc::call1 %u call done for req proc %x xid %u %s:%d done? %d ret %d \n", + clt_nonce_, proc, ca.xid, inet_ntoa(dst_.sin_addr), + ntohs(dst_.sin_port), ca.done, ca.intret); + + if(ch) + ch->decref(); + + // destruction of req automatically frees its buffer + return (ca.done? ca.intret : rpc_const::timeout_failure); +} + +void +rpcc::get_refconn(connection **ch) +{ + ScopedLock ml(&chan_m_); + if(!chan_ || chan_->isdead()){ + if(chan_) + chan_->decref(); + chan_ = connect_to_dst(dst_, this, lossytest_); + } + if(ch && chan_){ + if(*ch){ + (*ch)->decref(); + } + *ch = chan_; + (*ch)->incref(); + } +} + +// PollMgr's thread is being used to +// make this upcall from connection object to rpcc. +// this funtion must not block. +// +// this function keeps no reference for connection *c +bool +rpcc::got_pdu(connection *c, char *b, int sz) +{ + unmarshall rep(b, sz); + reply_header h; + rep.unpack_reply_header(&h); + + if(!rep.ok()){ + jsl_log(JSL_DBG_1, "rpcc:got_pdu unmarshall header failed!!!\n"); + return true; + } + + ScopedLock ml(&m_); + + update_xid_rep(h.xid); + + if(calls_.find(h.xid) == calls_.end()){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu xid %d no pending request\n", h.xid); + return true; + } + caller *ca = calls_[h.xid]; + + ScopedLock cl(&ca->m); + if(!ca->done){ + ca->un->take_in(rep); + ca->intret = h.ret; + if(ca->intret < 0){ + jsl_log(JSL_DBG_2, "rpcc::got_pdu: RPC reply error for xid %d intret %d\n", + h.xid, ca->intret); + } + ca->done = 1; + } + VERIFY(pthread_cond_broadcast(&ca->c) == 0); + return true; +} + +// assumes thread holds mutex m +void +rpcc::update_xid_rep(unsigned int xid) +{ + std::list::iterator it; + + if(xid <= xid_rep_window_.front()){ + return; + } + + for (it = xid_rep_window_.begin(); it != xid_rep_window_.end(); it++){ + if(*it > xid){ + xid_rep_window_.insert(it, xid); + goto compress; + } + } + xid_rep_window_.push_back(xid); + +compress: + it = xid_rep_window_.begin(); + for (it++; it != xid_rep_window_.end(); it++){ + while (xid_rep_window_.front() + 1 == *it) + xid_rep_window_.pop_front(); + } +} + + +rpcs::rpcs(unsigned int p1, int count) + : port_(p1), counting_(count), curr_counts_(count), lossytest_(0), reachable_ (true) +{ + VERIFY(pthread_mutex_init(&procs_m_, 0) == 0); + VERIFY(pthread_mutex_init(&count_m_, 0) == 0); + VERIFY(pthread_mutex_init(&reply_window_m_, 0) == 0); + VERIFY(pthread_mutex_init(&conss_m_, 0) == 0); + + set_rand_seed(); + nonce_ = random(); + jsl_log(JSL_DBG_2, "rpcs::rpcs created with nonce %d\n", nonce_); + + char *loss_env = getenv("RPC_LOSSY"); + if(loss_env != NULL){ + lossytest_ = atoi(loss_env); + } + + reg(rpc_const::bind, this, &rpcs::rpcbind); + dispatchpool_ = new ThrPool(6,false); + + listener_ = new tcpsconn(this, port_, lossytest_); +} + +rpcs::~rpcs() +{ + // must delete listener before dispatchpool + delete listener_; + delete dispatchpool_; + free_reply_window(); +} + +bool +rpcs::got_pdu(connection *c, char *b, int sz) +{ + if(!reachable_){ + jsl_log(JSL_DBG_1, "rpcss::got_pdu: not reachable\n"); + return true; + } + + djob_t *j = new djob_t(c, b, sz); + c->incref(); + bool succ = dispatchpool_->addObjJob(this, &rpcs::dispatch, j); + if(!succ || !reachable_){ + c->decref(); + delete j; + } + return succ; +} + +void +rpcs::reg1(unsigned int proc, handler *h) +{ + ScopedLock pl(&procs_m_); + VERIFY(procs_.count(proc) == 0); + procs_[proc] = h; + VERIFY(procs_.count(proc) >= 1); +} + +void +rpcs::updatestat(unsigned int proc) +{ + ScopedLock cl(&count_m_); + counts_[proc]++; + curr_counts_--; + if(curr_counts_ == 0){ + std::map::iterator i; + printf("RPC STATS: "); + for (i = counts_.begin(); i != counts_.end(); i++){ + printf("%x:%d ", i->first, i->second); + } + printf("\n"); + + ScopedLock rwl(&reply_window_m_); + std::map >::iterator clt; + + unsigned int totalrep = 0, maxrep = 0; + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + totalrep += clt->second.size(); + if(clt->second.size() > maxrep) + maxrep = clt->second.size(); + } + jsl_log(JSL_DBG_1, "REPLY WINDOW: clients %d total reply %d max per client %d\n", + (int) reply_window_.size(), totalrep, maxrep); + curr_counts_ = counting_; + } +} + +void +rpcs::dispatch(djob_t *j) +{ + connection *c = j->conn; + unmarshall req(j->buf, j->sz); + delete j; + + req_header h; + req.unpack_req_header(&h); + int proc = h.proc; + + if(!req.ok()){ + jsl_log(JSL_DBG_1, "rpcs:dispatch unmarshall header failed!!!\n"); + c->decref(); + return; + } + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc %u (proc %x, last_rep %u) from clt %u for srv instance %u \n", + h.xid, proc, h.xid_rep, h.clt_nonce, h.srv_nonce); + + marshall rep; + reply_header rh(h.xid,0); + + // is client sending to an old instance of server? + if(h.srv_nonce != 0 && h.srv_nonce != nonce_){ + jsl_log(JSL_DBG_2, + "rpcs::dispatch: rpc for an old server instance %u (current %u) proc %x\n", + h.srv_nonce, nonce_, h.proc); + rh.ret = rpc_const::oldsrv_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + return; + } + + handler *f; + // is RPC proc a registered procedure? + { + ScopedLock pl(&procs_m_); + if(procs_.count(proc) < 1){ + fprintf(stderr, "rpcs::dispatch: unknown proc %x.\n", + proc); + c->decref(); + VERIFY(0); + return; + } + + f = procs_[proc]; + } + + rpcs::rpcstate_t stat; + char *b1; + int sz1; + + if(h.clt_nonce){ + // have i seen this client before? + { + ScopedLock rwl(&reply_window_m_); + // if we don't know about this clt_nonce, create a cleanup object + if(reply_window_.find(h.clt_nonce) == reply_window_.end()){ + VERIFY (reply_window_[h.clt_nonce].size() == 0); // create + jsl_log(JSL_DBG_2, + "rpcs::dispatch: new client %u xid %d chan %d, total clients %d\n", + h.clt_nonce, h.xid, c->channo(), (int)reply_window_.size()); + } + } + + // save the latest good connection to the client + { + ScopedLock rwl(&conss_m_); + if(conns_.find(h.clt_nonce) == conns_.end()){ + c->incref(); + conns_[h.clt_nonce] = c; + } else if(conns_[h.clt_nonce]->compare(c) < 0){ + conns_[h.clt_nonce]->decref(); + c->incref(); + conns_[h.clt_nonce] = c; + } + } + + stat = checkduplicate_and_update(h.clt_nonce, h.xid, + h.xid_rep, &b1, &sz1); + } else { + // this client does not require at most once logic + stat = NEW; + } + + switch (stat){ + case NEW: // new request + if(counting_){ + updatestat(proc); + } + + rh.ret = f->fn(req, rep); + if (rh.ret == rpc_const::unmarshal_args_failure) { + fprintf(stderr, "rpcs::dispatch: failed to" + " unmarshall the arguments. You are" + " probably calling RPC 0x%x with wrong" + " types of arguments.\n", proc); + VERIFY(0); + } + VERIFY(rh.ret >= 0); + + rep.pack_reply_header(rh); + rep.take_buf(&b1,&sz1); + + jsl_log(JSL_DBG_2, + "rpcs::dispatch: sending and saving reply of size %d for rpc %u, proc %x ret %d, clt %u\n", + sz1, h.xid, proc, rh.ret, h.clt_nonce); + + if(h.clt_nonce > 0){ + // only record replies for clients that require at-most-once logic + add_reply(h.clt_nonce, h.xid, b1, sz1); + } + + // get the latest connection to the client + { + ScopedLock rwl(&conss_m_); + if(c->isdead() && c != conns_[h.clt_nonce]){ + c->decref(); + c = conns_[h.clt_nonce]; + c->incref(); + } + } + + c->send(b1, sz1); + if(h.clt_nonce == 0){ + // reply is not added to at-most-once window, free it + free(b1); + } + break; + case INPROGRESS: // server is working on this request + break; + case DONE: // duplicate and we still have the response + c->send(b1, sz1); + break; + case FORGOTTEN: // very old request and we don't have the response anymore + jsl_log(JSL_DBG_2, "rpcs::dispatch: very old request %u from %u\n", + h.xid, h.clt_nonce); + rh.ret = rpc_const::atmostonce_failure; + rep.pack_reply_header(rh); + c->send(rep.cstr(),rep.size()); + break; + } + c->decref(); +} + +// rpcs::dispatch calls this when an RPC request arrives. +// +// checks to see if an RPC with xid from clt_nonce has already been received. +// if not, remembers the request in reply_window_. +// +// deletes remembered requests with XIDs <= xid_rep; the client +// says it has received a reply for every RPC up through xid_rep. +// frees the reply_t::buf of each such request. +// +// returns one of: +// NEW: never seen this xid before. +// INPROGRESS: seen this xid, and still processing it. +// DONE: seen this xid, previous reply returned in *b and *sz. +// FORGOTTEN: might have seen this xid, but deleted previous reply. +rpcs::rpcstate_t +rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid, + unsigned int xid_rep, char **b, int *sz) +{ + ScopedLock rwl(&reply_window_m_); + + // You fill this in for Lab 1. + std::list *rlist = &(reply_window_[clt_nonce]); + std::list::iterator it; + rpcs::rpcstate_t ret = NEW; + + // if xid matches and cb_present, DONE! Else INPROGRESS + for (it = rlist->begin(); it != rlist->end(); it++) { + if ((*it).xid == xid) { + if ((*it).cb_present) { + *b = (*it).buf; + *sz = (*it).sz; + return DONE; + } else + return INPROGRESS; + } + } + + // if xid is too old, FORGOTTEN + if (rlist->size() > 0 && xid < rlist->front().xid) { + return FORGOTTEN; + } + + // insert new one + reply_t reply(xid); + reply.cb_present = false; + for (it = rlist->begin(); it != rlist->end(); it++) { + if ((*it).xid > xid) { + rlist->insert(it, reply); + break; + } + } + if (it == rlist->end()) + rlist->push_back(reply); + + // delete old ones + for (it = rlist->begin(); it != rlist->end(); it++) { + if ((*it).xid >= xid_rep) + break; + } + rlist->erase(rlist->begin(), it); + + return ret; +} + +// rpcs::dispatch calls add_reply when it is sending a reply to an RPC, +// and passes the return value in b and sz. +// add_reply() should remember b and sz. +// free_reply_window() and checkduplicate_and_update is responsible for +// calling free(b). +void +rpcs::add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz) +{ + ScopedLock rwl(&reply_window_m_); + + std::list *rlist = &(reply_window_[clt_nonce]); + std::list::iterator it; + for (it = rlist->begin(); it != rlist->end(); it++) { + if ((*it).xid == xid) { + (*it).sz = sz; + (*it).buf = b; + (*it).cb_present = true; + } + } +} + +void +rpcs::free_reply_window(void) +{ + std::map >::iterator clt; + std::list::iterator it; + + ScopedLock rwl(&reply_window_m_); + for (clt = reply_window_.begin(); clt != reply_window_.end(); clt++){ + for (it = clt->second.begin(); it != clt->second.end(); it++){ + free((*it).buf); + } + clt->second.clear(); + } + reply_window_.clear(); +} + +// rpc handler +int +rpcs::rpcbind(int a, int &r) +{ + jsl_log(JSL_DBG_2, "rpcs::rpcbind called return nonce %u\n", nonce_); + r = nonce_; + return 0; +} + +void +marshall::rawbyte(unsigned char x) +{ + if(_ind >= _capa){ + _capa *= 2; + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + _buf[_ind++] = x; +} + +void +marshall::rawbytes(const char *p, int n) +{ + if((_ind+n) > _capa){ + _capa = _capa > n? 2*_capa:(_capa+n); + VERIFY (_buf != NULL); + _buf = (char *)realloc(_buf, _capa); + VERIFY(_buf); + } + memcpy(_buf+_ind, p, n); + _ind += n; +} + +marshall & +operator<<(marshall &m, bool x) +{ + m.rawbyte(x); + return m; +} + +marshall & +operator<<(marshall &m, unsigned char x) +{ + m.rawbyte(x); + return m; +} + +marshall & +operator<<(marshall &m, char x) +{ + m << (unsigned char) x; + return m; +} + + +marshall & +operator<<(marshall &m, unsigned short x) +{ + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; +} + +marshall & +operator<<(marshall &m, short x) +{ + m << (unsigned short) x; + return m; +} + +marshall & +operator<<(marshall &m, unsigned int x) +{ + // network order is big-endian + m.rawbyte((x >> 24) & 0xff); + m.rawbyte((x >> 16) & 0xff); + m.rawbyte((x >> 8) & 0xff); + m.rawbyte(x & 0xff); + return m; +} + +marshall & +operator<<(marshall &m, int x) +{ + m << (unsigned int) x; + return m; +} + +marshall & +operator<<(marshall &m, const std::string &s) +{ + m << (unsigned int) s.size(); + m.rawbytes(s.data(), s.size()); + return m; +} + +marshall & +operator<<(marshall &m, unsigned long long x) +{ + m << (unsigned int) (x >> 32); + m << (unsigned int) x; + return m; +} + +void +marshall::pack(int x) +{ + rawbyte((x >> 24) & 0xff); + rawbyte((x >> 16) & 0xff); + rawbyte((x >> 8) & 0xff); + rawbyte(x & 0xff); +} + +void +unmarshall::unpack(int *x) +{ + (*x) = (rawbyte() & 0xff) << 24; + (*x) |= (rawbyte() & 0xff) << 16; + (*x) |= (rawbyte() & 0xff) << 8; + (*x) |= rawbyte() & 0xff; +} + +// take the contents from another unmarshall object +void +unmarshall::take_in(unmarshall &another) +{ + if(_buf) + free(_buf); + another.take_buf(&_buf, &_sz); + _ind = RPC_HEADER_SZ; + _ok = _sz >= RPC_HEADER_SZ?true:false; +} + +bool +unmarshall::okdone() +{ + if(ok() && _ind == _sz){ + return true; + } else { + return false; + } +} + +unsigned int +unmarshall::rawbyte() +{ + char c = 0; + if(_ind >= _sz) + _ok = false; + else + c = _buf[_ind++]; + return c; +} + +unmarshall & +operator>>(unmarshall &u, bool &x) +{ + x = (bool) u.rawbyte() ; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned char &x) +{ + x = (unsigned char) u.rawbyte() ; + return u; +} + +unmarshall & +operator>>(unmarshall &u, char &x) +{ + x = (char) u.rawbyte(); + return u; +} + + +unmarshall & +operator>>(unmarshall &u, unsigned short &x) +{ + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, short &x) +{ + x = (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned int &x) +{ + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, int &x) +{ + x = (u.rawbyte() & 0xff) << 24; + x |= (u.rawbyte() & 0xff) << 16; + x |= (u.rawbyte() & 0xff) << 8; + x |= u.rawbyte() & 0xff; + return u; +} + +unmarshall & +operator>>(unmarshall &u, unsigned long long &x) +{ + unsigned int h, l; + u >> h; + u >> l; + x = l | ((unsigned long long) h << 32); + return u; +} + +unmarshall & +operator>>(unmarshall &u, std::string &s) +{ + unsigned sz; + u >> sz; + if(u.ok()) + u.rawbytes(s, sz); + return u; +} + +void +unmarshall::rawbytes(std::string &ss, unsigned int n) +{ + if((_ind+n) > (unsigned)_sz){ + _ok = false; + } else { + std::string tmps = std::string(_buf+_ind, n); + swap(ss, tmps); + VERIFY(ss.size() == n); + _ind += n; + } +} + +bool operator<(const sockaddr_in &a, const sockaddr_in &b){ + return ((a.sin_addr.s_addr < b.sin_addr.s_addr) || + ((a.sin_addr.s_addr == b.sin_addr.s_addr) && + ((a.sin_port < b.sin_port)))); +} + +/*---------------auxilary function--------------*/ +void +make_sockaddr(const char *hostandport, struct sockaddr_in *dst){ + + char host[200]; + const char *localhost = "127.0.0.1"; + const char *port = index(hostandport, ':'); + if(port == NULL){ + memcpy(host, localhost, strlen(localhost)+1); + port = hostandport; + } else { + memcpy(host, hostandport, port-hostandport); + host[port-hostandport] = '\0'; + port++; + } + + make_sockaddr(host, port, dst); + +} + +void +make_sockaddr(const char *host, const char *port, struct sockaddr_in *dst){ + + in_addr_t a; + + bzero(dst, sizeof(*dst)); + dst->sin_family = AF_INET; + + a = inet_addr(host); + if(a != INADDR_NONE){ + dst->sin_addr.s_addr = a; + } else { + struct hostent *hp = gethostbyname(host); + if(hp == 0 || hp->h_length != 4){ + fprintf(stderr, "cannot find host name %s\n", host); + exit(1); + } + dst->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr))->s_addr; + } + dst->sin_port = htons(atoi(port)); +} + +int +cmp_timespec(const struct timespec &a, const struct timespec &b) +{ + if(a.tv_sec > b.tv_sec) + return 1; + else if(a.tv_sec < b.tv_sec) + return -1; + else { + if(a.tv_nsec > b.tv_nsec) + return 1; + else if(a.tv_nsec < b.tv_nsec) + return -1; + else + return 0; + } +} + +void +add_timespec(const struct timespec &a, int b, struct timespec *result) +{ + // convert to millisec, add timeout, convert back + result->tv_sec = a.tv_sec + b/1000; + result->tv_nsec = a.tv_nsec + (b % 1000) * 1000000; + VERIFY(result->tv_nsec >= 0); + while (result->tv_nsec > 1000000000){ + result->tv_sec++; + result->tv_nsec-=1000000000; + } +} + +int +diff_timespec(const struct timespec &end, const struct timespec &start) +{ + int diff = (end.tv_sec > start.tv_sec)?(end.tv_sec-start.tv_sec)*1000:0; + VERIFY(diff || end.tv_sec == start.tv_sec); + if(end.tv_nsec > start.tv_nsec){ + diff += (end.tv_nsec-start.tv_nsec)/1000000; + } else { + diff -= (start.tv_nsec-end.tv_nsec)/1000000; + } + return diff; +} diff --git a/rpc/rpc.h b/rpc/rpc.h new file mode 100644 index 0000000..999810c --- /dev/null +++ b/rpc/rpc.h @@ -0,0 +1,627 @@ +#ifndef rpc_h +#define rpc_h + +#include +#include +#include +#include +#include + +#include "thr_pool.h" +#include "marshall.h" +#include "connection.h" + +#ifdef DMALLOC +#include "dmalloc.h" +#endif + +class rpc_const { + public: + static const unsigned int bind = 1; // handler number reserved for bind + static const int timeout_failure = -1; + static const int unmarshal_args_failure = -2; + static const int unmarshal_reply_failure = -3; + static const int atmostonce_failure = -4; + static const int oldsrv_failure = -5; + static const int bind_failure = -6; + static const int cancel_failure = -7; +}; + +// rpc client endpoint. +// manages a xid space per destination socket +// threaded: multiple threads can be sending RPCs, +class rpcc : public chanmgr { + + private: + + //manages per rpc info + struct caller { + caller(unsigned int xxid, unmarshall *un); + ~caller(); + + unsigned int xid; + unmarshall *un; + int intret; + bool done; + pthread_mutex_t m; + pthread_cond_t c; + }; + + void get_refconn(connection **ch); + void update_xid_rep(unsigned int xid); + + + sockaddr_in dst_; + unsigned int clt_nonce_; + unsigned int srv_nonce_; + bool bind_done_; + unsigned int xid_; + int lossytest_; + bool retrans_; + bool reachable_; + + connection *chan_; + + pthread_mutex_t m_; // protect insert/delete to calls[] + pthread_mutex_t chan_m_; + + bool destroy_wait_; + pthread_cond_t destroy_wait_c_; + + std::map calls_; + std::list xid_rep_window_; + + struct request { + request() { clear(); } + void clear() { buf.clear(); xid = -1; } + bool isvalid() { return xid != -1; } + std::string buf; + int xid; + }; + struct request dup_req_; + int xid_rep_done_; + public: + + rpcc(sockaddr_in d, bool retrans=true); + ~rpcc(); + + struct TO { + int to; + }; + static const TO to_max; + static const TO to_min; + static TO to(int x) { TO t; t.to = x; return t;} + + unsigned int id() { return clt_nonce_; } + + int bind(TO to = to_max); + + void set_reachable(bool r) { reachable_ = r; } + + void cancel(); + + int islossy() { return lossytest_ > 0; } + + int call1(unsigned int proc, + marshall &req, unmarshall &rep, TO to); + + bool got_pdu(connection *c, char *b, int sz); + + + template + int call_m(unsigned int proc, marshall &req, R & r, TO to); + + template + int call(unsigned int proc, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, R & r, + TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, const A6 & a6, + R & r, TO to = to_max); + template + int call(unsigned int proc, const A1 & a1, const A2 & a2, const A3 & a3, + const A4 & a4, const A5 & a5, const A6 &a6, const A7 &a7, + R & r, TO to = to_max); + +}; + +template int +rpcc::call_m(unsigned int proc, marshall &req, R & r, TO to) +{ + unmarshall u; + int intret = call1(proc, req, u, to); + if (intret < 0) return intret; + u >> r; + if(u.okdone() != true) { + fprintf(stderr, "rpcc::call_m: failed to unmarshall the reply." + "You are probably calling RPC 0x%x with wrong return " + "type.\n", proc); + VERIFY(0); + return rpc_const::unmarshal_reply_failure; + } + return intret; +} + +template int +rpcc::call(unsigned int proc, R & r, TO to) +{ + marshall m; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, R & r, TO to) +{ + marshall m; + m << a1; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, + const A6 & a6, R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + m << a6; + return call_m(proc, m, r, to); +} + +template int +rpcc::call(unsigned int proc, const A1 & a1, const A2 & a2, + const A3 & a3, const A4 & a4, const A5 & a5, + const A6 & a6, const A7 & a7, + R & r, TO to) +{ + marshall m; + m << a1; + m << a2; + m << a3; + m << a4; + m << a5; + m << a6; + m << a7; + return call_m(proc, m, r, to); +} + +bool operator<(const sockaddr_in &a, const sockaddr_in &b); + +class handler { + public: + handler() { } + virtual ~handler() { } + virtual int fn(unmarshall &, marshall &) = 0; +}; + + +// rpc server endpoint. +class rpcs : public chanmgr { + + typedef enum { + NEW, // new RPC, not a duplicate + INPROGRESS, // duplicate of an RPC we're still processing + DONE, // duplicate of an RPC we already replied to (have reply) + FORGOTTEN, // duplicate of an old RPC whose reply we've forgotten + } rpcstate_t; + + private: + + // state about an in-progress or completed RPC, for at-most-once. + // if cb_present is true, then the RPC is complete and a reply + // has been sent; in that case buf points to a copy of the reply, + // and sz holds the size of the reply. + struct reply_t { + reply_t (unsigned int _xid) { + xid = _xid; + cb_present = false; + buf = NULL; + sz = 0; + } + unsigned int xid; + bool cb_present; // whether the reply buffer is valid + char *buf; // the reply buffer + int sz; // the size of reply buffer + }; + + int port_; + unsigned int nonce_; + + // provide at most once semantics by maintaining a window of replies + // per client that that client hasn't acknowledged receiving yet. + // indexed by client nonce. + std::map > reply_window_; + + void free_reply_window(void); + void add_reply(unsigned int clt_nonce, unsigned int xid, char *b, int sz); + + rpcstate_t checkduplicate_and_update(unsigned int clt_nonce, + unsigned int xid, unsigned int rep_xid, + char **b, int *sz); + + void updatestat(unsigned int proc); + + // latest connection to the client + std::map conns_; + + // counting + const int counting_; + int curr_counts_; + std::map counts_; + + int lossytest_; + bool reachable_; + + // map proc # to function + std::map procs_; + + pthread_mutex_t procs_m_; // protect insert/delete to procs[] + pthread_mutex_t count_m_; //protect modification of counts + pthread_mutex_t reply_window_m_; // protect reply window et al + pthread_mutex_t conss_m_; // protect conns_ + + + protected: + + struct djob_t { + djob_t (connection *c, char *b, int bsz):buf(b),sz(bsz),conn(c) {} + char *buf; + int sz; + connection *conn; + }; + void dispatch(djob_t *); + + // internal handler registration + void reg1(unsigned int proc, handler *); + + ThrPool* dispatchpool_; + tcpsconn* listener_; + + public: + rpcs(unsigned int port, int counts=0); + ~rpcs(); + + //RPC handler for clients binding + int rpcbind(int a, int &r); + + void set_reachable(bool r) { reachable_ = r; } + + bool got_pdu(connection *c, char *b, int sz); + + // register a handler + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1 a1, const A2, + R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + const A6, R & r)); + template + void reg(unsigned int proc, S*, int (S::*meth)(const A1, const A2, + const A3, const A4, const A5, + const A6, const A7, + R & r)); +}; + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + R r; + args >> a1; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + R r; + args >> a1; + args >> a2; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + R r; + args >> a1; + args >> a2; + args >> a3; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, const A6 a6, + R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, const A6 a6, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, const A6 a6, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + A6 a6; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + args >> a6; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + +template void +rpcs::reg(unsigned int proc, S*sob, int (S::*meth)(const A1 a1, const A2 a2, + const A3 a3, const A4 a4, + const A5 a5, const A6 a6, + const A7 a7, R & r)) +{ + class h1 : public handler { + private: + S * sob; + int (S::*meth)(const A1 a1, const A2 a2, const A3 a3, const A4 a4, + const A5 a5, const A6 a6, const A7 a7, R & r); + public: + h1(S *xsob, int (S::*xmeth)(const A1 a1, const A2 a2, const A3 a3, + const A4 a4, const A5 a5, const A6 a6, + const A7 a7, R & r)) + : sob(xsob), meth(xmeth) { } + int fn(unmarshall &args, marshall &ret) { + A1 a1; + A2 a2; + A3 a3; + A4 a4; + A5 a5; + A6 a6; + A7 a7; + R r; + args >> a1; + args >> a2; + args >> a3; + args >> a4; + args >> a5; + args >> a6; + args >> a7; + if(!args.okdone()) + return rpc_const::unmarshal_args_failure; + int b = (sob->*meth)(a1, a2, a3, a4, a5, a6, a7, r); + ret << r; + return b; + } + }; + reg1(proc, new h1(sob, meth)); +} + + +void make_sockaddr(const char *hostandport, struct sockaddr_in *dst); +void make_sockaddr(const char *host, const char *port, + struct sockaddr_in *dst); + +int cmp_timespec(const struct timespec &a, const struct timespec &b); +void add_timespec(const struct timespec &a, int b, struct timespec *result); +int diff_timespec(const struct timespec &a, const struct timespec &b); + +#endif diff --git a/rpc/rpctest.cc b/rpc/rpctest.cc new file mode 100644 index 0000000..74c61d1 --- /dev/null +++ b/rpc/rpctest.cc @@ -0,0 +1,479 @@ +// RPC test and pseudo-documentation. +// generates print statements on failures, but eventually says "rpctest OK" + +#include "rpc.h" +#include +#include +#include +#include +#include +#include "jsl_log.h" +#include "gettime.h" +#include "lang/verify.h" + +#define NUM_CL 2 + +rpcs *server; // server rpc object +rpcc *clients[NUM_CL]; // client rpc object +struct sockaddr_in dst; //server's ip address +int port; +pthread_attr_t attr; + +// server-side handlers. they must be methods of some class +// to simplify rpcs::reg(). a server process can have handlers +// from multiple classes. +class srv { + public: + int handle_22(const std::string a, const std::string b, std::string & r); + int handle_fast(const int a, int &r); + int handle_slow(const int a, int &r); + int handle_bigrep(const int a, std::string &r); +}; + +// a handler. a and b are arguments, r is the result. +// there can be multiple arguments but only one result. +// the caller also gets to see the int return value +// as the return value from rpcc::call(). +// rpcs::reg() decides how to unmarshall by looking +// at these argument types, so this function definition +// does what a .x file does in SunRPC. +int +srv::handle_22(const std::string a, std::string b, std::string &r) +{ + r = a + b; + return 0; +} + +int +srv::handle_fast(const int a, int &r) +{ + r = a + 1; + return 0; +} + +int +srv::handle_slow(const int a, int &r) +{ + usleep(random() % 5000); + r = a + 2; + return 0; +} + +int +srv::handle_bigrep(const int len, std::string &r) +{ + r = std::string(len, 'x'); + return 0; +} + +srv service; + +void startserver() +{ + server = new rpcs(port); + server->reg(22, &service, &srv::handle_22); + server->reg(23, &service, &srv::handle_fast); + server->reg(24, &service, &srv::handle_slow); + server->reg(25, &service, &srv::handle_bigrep); +} + +void +testmarshall() +{ + marshall m; + req_header rh(1,2,3,4,5); + m.pack_req_header(rh); + VERIFY(m.size()==RPC_HEADER_SZ); + int i = 12345; + unsigned long long l = 1223344455L; + std::string s = std::string("hallo...."); + m << i; + m << l; + m << s; + + char *b; + int sz; + m.take_buf(&b,&sz); + VERIFY(sz == (int)(RPC_HEADER_SZ+sizeof(i)+sizeof(l)+s.size()+sizeof(int))); + + unmarshall un(b,sz); + req_header rh1; + un.unpack_req_header(&rh1); + VERIFY(memcmp(&rh,&rh1,sizeof(rh))==0); + int i1; + unsigned long long l1; + std::string s1; + un >> i1; + un >> l1; + un >> s1; + VERIFY(un.okdone()); + VERIFY(i1==i && l1==l && s1==s); +} + +void * +client1(void *xx) +{ + + // test concurrency. + int which_cl = ((unsigned long) xx ) % NUM_CL; + + for(int i = 0; i < 100; i++){ + int arg = (random() % 2000); + std::string rep; + int ret = clients[which_cl]->call(25, arg, rep); + VERIFY(ret == 0); + if ((int)rep.size()!=arg) { + printf("repsize wrong %d!=%d\n", (int)rep.size(), arg); + } + VERIFY((int)rep.size() == arg); + } + + // test rpc replies coming back not in the order of + // the original calls -- i.e. does xid reply dispatch work. + for(int i = 0; i < 100; i++){ + int which = (random() % 2); + int arg = (random() % 1000); + int rep; + + struct timespec start,end; + clock_gettime(CLOCK_REALTIME, &start); + + int ret = clients[which_cl]->call(which ? 23 : 24, arg, rep); + clock_gettime(CLOCK_REALTIME, &end); + int diff = diff_timespec(end, start); + if (ret != 0) + printf("%d ms have elapsed!!!\n", diff); + VERIFY(ret == 0); + VERIFY(rep == (which ? arg+1 : arg+2)); + } + + return 0; +} + +void * +client2(void *xx) +{ + int which_cl = ((unsigned long) xx ) % NUM_CL; + + time_t t1; + time(&t1); + + while(time(0) - t1 < 10){ + int arg = (random() % 2000); + std::string rep; + int ret = clients[which_cl]->call(25, arg, rep); + if ((int)rep.size()!=arg) { + printf("ask for %d reply got %d ret %d\n", + arg, (int)rep.size(), ret); + } + VERIFY((int)rep.size() == arg); + } + return 0; +} + +void * +client3(void *xx) +{ + rpcc *c = (rpcc *) xx; + + for(int i = 0; i < 4; i++){ + int rep; + int ret = c->call(24, i, rep, rpcc::to(3000)); + VERIFY(ret == rpc_const::timeout_failure || rep == i+2); + } + return 0; +} + + +void +simple_tests(rpcc *c) +{ + printf("simple_tests\n"); + // an RPC call to procedure #22. + // rpcc::call() looks at the argument types to decide how + // to marshall the RPC call packet, and how to unmarshall + // the reply packet. + std::string rep; + int intret = c->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == 0); // this is what handle_22 returns + VERIFY(rep == "hello goodbye"); + printf(" -- string concat RPC .. ok\n"); + + // small request, big reply (perhaps req via UDP, reply via TCP) + intret = c->call(25, 70000, rep, rpcc::to(200000)); + VERIFY(intret == 0); + VERIFY(rep.size() == 70000); + printf(" -- small request, big reply .. ok\n"); + +#if 0 + // too few arguments + intret = c->call(22, (std::string)"just one", rep); + VERIFY(intret < 0); + printf(" -- too few arguments .. failed ok\n"); + + // too many arguments; proc #23 expects just one. + intret = c->call(23, 1001, 1002, rep); + VERIFY(intret < 0); + printf(" -- too many arguments .. failed ok\n"); + + // wrong return value size + int wrongrep; + intret = c->call(23, (std::string)"hello", (std::string)" goodbye", wrongrep); + VERIFY(intret < 0); + printf(" -- wrong ret value size .. failed ok\n"); +#endif + + // specify a timeout value to an RPC that should succeed (udp) + int xx = 0; + intret = c->call(23, 77, xx, rpcc::to(3000)); + VERIFY(intret == 0 && xx == 78); + printf(" -- no suprious timeout .. ok\n"); + + // specify a timeout value to an RPC that should succeed (tcp) + { + std::string arg(1000, 'x'); + std::string rep; + c->call(22, arg, (std::string)"x", rep, rpcc::to(3000)); + VERIFY(rep.size() == 1001); + printf(" -- no suprious timeout .. ok\n"); + } + + // huge RPC + std::string big(1000000, 'x'); + intret = c->call(22, big, (std::string)"z", rep); + VERIFY(rep.size() == 1000001); + printf(" -- huge 1M rpc request .. ok\n"); + + // specify a timeout value to an RPC that should timeout (udp) + struct sockaddr_in non_existent; + memset(&non_existent, 0, sizeof(non_existent)); + non_existent.sin_family = AF_INET; + non_existent.sin_addr.s_addr = inet_addr("127.0.0.1"); + non_existent.sin_port = htons(7661); + rpcc *c1 = new rpcc(non_existent); + time_t t0 = time(0); + intret = c1->bind(rpcc::to(3000)); + time_t t1 = time(0); + VERIFY(intret < 0 && (t1 - t0) <= 4); + printf(" -- rpc timeout .. ok\n"); + printf("simple_tests OK\n"); +} + +void +concurrent_test(int nt) +{ + // create threads that make lots of calls in parallel, + // to test thread synchronization for concurrent calls + // and dispatches. + int ret; + + printf("start concurrent_test (%d threads) ...", nt); + + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client1, (void *) (uintptr_t)i); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf(" OK\n"); +} + +void +lossy_test() +{ + int ret; + + printf("start lossy_test ..."); + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + + if (server) { + delete server; + startserver(); + } + + for (int i = 0; i < NUM_CL; i++) { + delete clients[i]; + clients[i] = new rpcc(dst); + VERIFY(clients[i]->bind()==0); + } + + int nt = 1; + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client2, (void *) (uintptr_t)i); + VERIFY(ret == 0); + } + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf(".. OK\n"); + VERIFY(setenv("RPC_LOSSY", "0", 1) == 0); +} + +void +failure_test() +{ + rpcc *client1; + rpcc *client = clients[0]; + + printf("failure_test\n"); + + delete server; + + client1 = new rpcc(dst); + VERIFY (client1->bind(rpcc::to(3000)) < 0); + printf(" -- create new client and try to bind to failed server .. failed ok\n"); + + delete client1; + + startserver(); + + std::string rep; + int intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == rpc_const::oldsrv_failure); + printf(" -- call recovered server with old client .. failed ok\n"); + + delete client; + + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + VERIFY (client->bind() < 0); + + intret = client->call(22, (std::string)"hello", (std::string)" goodbye", rep); + VERIFY(intret == 0); + VERIFY(rep == "hello goodbye"); + + printf(" -- delete existing rpc client, create replacement rpc client .. ok\n"); + + + int nt = 10; + int ret; + printf(" -- concurrent test on new rpc client w/ %d threads ..", nt); + + pthread_t th[nt]; + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client3, (void *) client); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf("ok\n"); + + delete server; + delete client; + + startserver(); + clients[0] = client = new rpcc(dst); + VERIFY (client->bind() >= 0); + printf(" -- delete existing rpc client and server, create replacements.. ok\n"); + + printf(" -- concurrent test on new client and server w/ %d threads ..", nt); + for(int i = 0; i < nt; i++){ + ret = pthread_create(&th[i], &attr, client3, (void *)client); + VERIFY(ret == 0); + } + + for(int i = 0; i < nt; i++){ + VERIFY(pthread_join(th[i], NULL) == 0); + } + printf("ok\n"); + + printf("failure_test OK\n"); +} + +int +main(int argc, char *argv[]) +{ + + setvbuf(stdout, NULL, _IONBF, 0); + setvbuf(stderr, NULL, _IONBF, 0); + int debug_level = 0; + + bool isclient = false; + bool isserver = false; + + srandom(getpid()); + port = 20000 + (getpid() % 10000); + + char ch = 0; + while ((ch = getopt(argc, argv, "csd:p:l"))!=-1) { + switch (ch) { + case 'c': + isclient = true; + break; + case 's': + isserver = true; + break; + case 'd': + debug_level = atoi(optarg); + break; + case 'p': + port = atoi(optarg); + break; + case 'l': + VERIFY(setenv("RPC_LOSSY", "5", 1) == 0); + default: + break; + } + } + + if (!isserver && !isclient) { + isserver = isclient = true; + } + + if (debug_level > 0) { + //__loginit.initNow(); + jsl_set_debug(debug_level); + jsl_log(JSL_DBG_1, "DEBUG LEVEL: %d\n", debug_level); + } + + testmarshall(); + + pthread_attr_init(&attr); + // set stack size to 32K, so we don't run out of memory + pthread_attr_setstacksize(&attr, 32*1024); + + if (isserver) { + printf("starting server on port %d RPC_HEADER_SZ %d\n", port, RPC_HEADER_SZ); + startserver(); + } + + if (isclient) { + // server's address. + memset(&dst, 0, sizeof(dst)); + dst.sin_family = AF_INET; + dst.sin_addr.s_addr = inet_addr("127.0.0.1"); + dst.sin_port = htons(port); + + + // start the client. bind it to the server. + // starts a thread to listen for replies and hand them to + // the correct waiting caller thread. there should probably + // be only one rpcc per process. you probably need one + // rpcc per server. + for (int i = 0; i < NUM_CL; i++) { + clients[i] = new rpcc(dst); + VERIFY (clients[i]->bind() == 0); + } + + simple_tests(clients[0]); + concurrent_test(10); + lossy_test(); + if (isserver) { + failure_test(); + } + + printf("rpctest OK\n"); + + exit(0); + } + + while (1) { + sleep(1); + } +} diff --git a/rpc/slock.h b/rpc/slock.h new file mode 100644 index 0000000..4876fcb --- /dev/null +++ b/rpc/slock.h @@ -0,0 +1,17 @@ +#ifndef __SCOPED_LOCK__ +#define __SCOPED_LOCK__ + +#include +#include "lang/verify.h" +struct ScopedLock { + private: + pthread_mutex_t *m_; + public: + ScopedLock(pthread_mutex_t *m): m_(m) { + VERIFY(pthread_mutex_lock(m_)==0); + } + ~ScopedLock() { + VERIFY(pthread_mutex_unlock(m_)==0); + } +}; +#endif /*__SCOPED_LOCK__*/ diff --git a/rpc/thr_pool.cc b/rpc/thr_pool.cc new file mode 100644 index 0000000..f9f32fa --- /dev/null +++ b/rpc/thr_pool.cc @@ -0,0 +1,69 @@ +#include "slock.h" +#include "thr_pool.h" +#include +#include +#include "lang/verify.h" + +static void * +do_worker(void *arg) +{ + ThrPool *tp = (ThrPool *)arg; + while (1) { + ThrPool::job_t j; + if (!tp->takeJob(&j)) + break; //die + + (void)(j.f)(j.a); + } + pthread_exit(NULL); +} + +//if blocking, then addJob() blocks when queue is full +//otherwise, addJob() simply returns false when queue is full +ThrPool::ThrPool(int sz, bool blocking) +: nthreads_(sz),blockadd_(blocking),jobq_(100*sz) +{ + pthread_attr_init(&attr_); + pthread_attr_setstacksize(&attr_, 128<<10); + + for (int i = 0; i < sz; i++) { + pthread_t t; + VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0); + th_.push_back(t); + } +} + +//IMPORTANT: this function can be called only when no external thread +//will ever use this thread pool again or is currently blocking on it +ThrPool::~ThrPool() +{ + for (int i = 0; i < nthreads_; i++) { + job_t j; + j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit + jobq_.enq(j); + } + + for (int i = 0; i < nthreads_; i++) { + VERIFY(pthread_join(th_[i], NULL)==0); + } + + VERIFY(pthread_attr_destroy(&attr_)==0); +} + +bool +ThrPool::addJob(void *(*f)(void *), void *a) +{ + job_t j; + j.f = f; + j.a = a; + + return jobq_.enq(j,blockadd_); +} + +bool +ThrPool::takeJob(job_t *j) +{ + jobq_.deq(j); + return (j->f!=NULL); +} + diff --git a/rpc/thr_pool.h b/rpc/thr_pool.h new file mode 100644 index 0000000..5095961 --- /dev/null +++ b/rpc/thr_pool.h @@ -0,0 +1,66 @@ +#ifndef __THR_POOL__ +#define __THR_POOL__ + +#include +#include + +#include "fifo.h" + +class ThrPool { + + + public: + struct job_t { + void *(*f)(void *); //function point + void *a; //function arguments + }; + + ThrPool(int sz, bool blocking=true); + ~ThrPool(); + template bool addObjJob(C *o, void (C::*m)(A), A a); + void waitDone(); + + bool takeJob(job_t *j); + + private: + pthread_attr_t attr_; + int nthreads_; + bool blockadd_; + + + fifo jobq_; + std::vector th_; + + bool addJob(void *(*f)(void *), void *a); +}; + + template bool +ThrPool::addObjJob(C *o, void (C::*m)(A), A a) +{ + + class objfunc_wrapper { + public: + C *o; + void (C::*m)(A a); + A a; + static void *func(void *vvv) { + objfunc_wrapper *x = (objfunc_wrapper*)vvv; + C *o = x->o; + void (C::*m)(A ) = x->m; + A a = x->a; + (o->*m)(a); + delete x; + return 0; + } + }; + + objfunc_wrapper *x = new objfunc_wrapper; + x->o = o; + x->m = m; + x->a = a; + return addJob(&objfunc_wrapper::func, (void *)x); +} + + +#endif +