Distributed Monitoring System

A system continuously gathers different metrics like CPU Usage, Memory Usage, File System Read/Write Speed and broadcasts this statistics through the network. I used techniques like shared-memory, pipe and also POSIX APIs for reducing overhead of the application.

server.c


		

#include "stdio.h"
#include "string.h"
#include "stdlib.h"
#include "sys/socket.h"
#include "arpa/inet.h"
#include "unistd.h"
#include "pthread.h"
#include "semaphore.h"
#include "sys/mman.h"
#include "fcntl.h"

#include "Shared_Mem.h"
#include "xml.h"

#define METRIC_SHM_SIZE 2

char client_list[200];

struct Avg_Info {
	char start_time[20];
	int cpu;
	int mem;
	int fwr;
	int fsr;
};

struct Client_t{

	int sock_fd; 	// Socket File Descriptor

	struct Shm_Protect *shared_mem;
};

void set_shm( int value, char *prefix, char *shm_name, char *client_name) { // client->shm_name = str

	char name[50];

	strcpy(name, prefix);
	strcat(name, shm_name);
	strcat(name, client_name);

	int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
	ftruncate(shm_fd, METRIC_SHM_SIZE);
	char *ptr = mmap(0,METRIC_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
	sprintf(ptr, "%d", value);

}

int get_shm(char *prefix, char *metric_name, char *client_name) {

	char shm_name[50];

	strcpy(shm_name, prefix);
	strcat(shm_name, metric_name);
	strcat(shm_name, client_name);

	int shm_fd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
	char *ptr = mmap(0,METRIC_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
	int num = atoi(ptr);


	return num;
}

void process_msg(char* msg) {

	struct Metric *metric = xml_parser(msg);

	if(!strstr(client_list, metric->CLIENT_N))
		sprintf(client_list, "%s\n%s", client_list, metric->CLIENT_N);

	set_shm(metric->content, "", metric->name, metric->CLIENT_N);

//	int last_count = get_shm("count", metric->name, metric->CLIENT_N);
//
//	set_shm(last_count+1, "count", metric->name, metric->CLIENT_N);
//
//	int last_avg = get_shm("avg", metric->name, metric->CLIENT_N);
//
//	last_avg = ((last_avg*last_count) + metric->content) / (last_count+1) ;
//
//	set_shm(last_avg, "avg", metric->name, metric->CLIENT_N);

}

/*
 * This will handle connection for each client
 * */
void *connection_handler(void *input_client){

	struct Client_t *client = input_client;

	while(1) {

		char buffer[4096];

		read(client->sock_fd, buffer, sizeof(buffer));

//		printf("Produced = %s\n", buffer);

		write_shm(buffer, client->shared_mem); // write in shared memory with inner embedded semaphores

	}
}

void *consumer_thread(void * input_shared_mem) {

	struct Shm_Protect* shared_mem = input_shared_mem;

	while(1) {

		char *msg = read_shm(shared_mem);

		process_msg(msg);

//		printf("Consumed = %s\n", msg);

//		sleep(3);
	}
}

void *reporter_thread(void *param) {

	sleep(3);

	char cmd[50];

	while(1) {

		scanf("%s", cmd);

		if(strcmp("Show", cmd) == 0 ) {
			printf("%s\n", client_list);
		}
		else if(strcmp("Now", cmd) == 0 ) {
			scanf("%s", cmd);
			if(strcmp("All", cmd) == 0 ) {
				// !TODO Show All Implementing
				char *name = client_list;

				while(*name != '\0') {
					sscanf(name, "%s", name);

					printf("%s:\n", name);

					printf("Cpu usage: %d%\n", get_shm("", "cpu", name));
					printf("Memory usage: %d \n", get_shm("", "mem", name));
					printf("FS read: %d B/sec\n", get_shm("", "fsr", name));
					name += strlen(name) + 1;

					printf("-----------------------\n");
				}
			}
			else {
				printf("Cpu usage: %d%\n", get_shm("", "cpu", cmd));
				printf("Memory usage: %d \n", get_shm("", "mem", cmd));
				printf("FS write: %d B/sec\n", get_shm("", "fsw", cmd));
				printf("FS read: %d B/sec\n", get_shm("", "fsr", cmd));
			}

		}

		printf("=============================\n");
		sleep(1);
	}
}

int main(int argc , char *argv[])
{
	printf("\n\n\n\n");

	printf("Author: Soheil Shababi - 92213117\n");	
	printf("OS Projcet - Phase 2\n");
	printf("Monitoring Distributer System - Version 2.00\n");

	printf("\n\n\n\n");
	

    int socket_desc , client_sock , c;
    struct sockaddr_in server , client;

    //Create socket
    socket_desc = socket(AF_INET , SOCK_STREAM , 0);
    if (socket_desc == -1)
    {
        printf("Could not create socket");
    }
    puts("Socket created");

    //Prepare the sockaddr_in structure
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_port = htons( 12345 );

    //Bind
    if( bind(socket_desc,(struct sockaddr *)&server , sizeof(server)) < 0)
    {
        //print the error message
        perror("bind failed. Error");
        return 1;
    }
    puts("bind done");

    //Listen
    listen(socket_desc , 3);

    //Accept and incoming connection
    puts("Waiting for incoming connections...");
    c = sizeof(struct sockaddr_in);

	pthread_t thread_id, consumer_tid, reporter_tid;
    pthread_create( &reporter_tid , NULL ,  reporter_thread , NULL);

    struct Shm_Protect* shared_mem = create_shm("PC_Queue");// Producer-Consumer Queue implementing with Shared Memory
    pthread_create( &consumer_tid , NULL ,  consumer_thread , (void*) shared_mem);

    while( (client_sock = accept(socket_desc, (struct sockaddr *)&client, (socklen_t*)&c)) )
    {
        puts("Connection accepted");

        struct Client_t *client = malloc(sizeof *client);

        client->sock_fd = client_sock;

        client->shared_mem = shared_mem;

        //!TODO: set start_time for every client when connect ro server
        //!TODO: set (avg_metric_shm = 0) for every metric of every client when connect to server
        //!TODO: count number of received each metric for each client

        if( pthread_create( &thread_id , NULL ,  connection_handler , (void*) client) < 0)
        {
            perror("could not create thread");
            return 1;
        }

        //Now join the thread , so that we dont terminate before the thread
        //pthread_join( thread_id , NULL);
        puts("Handler assigned");
    }

    if (client_sock < 0)
    {
        perror("accept failed");
        return 1;
    }

    return 0;
}

		

Metrics.c



/*
 * Metrics.c
 *
 *  Created on: Jan 11, 2016
 *      Author: cl
 */
#include "Metrics.h"

void get_cpuTime(struct CpuTime* cpuTime) {

    FILE *file;

    file = fopen("/proc/stat", "r");

    fscanf(file, "cpu %llu %llu %llu %llu %llu %llu %llu %llu %llu %llu",
    		&cpuTime->user_t, &cpuTime->nice_t, &cpuTime->system_t,
    		&cpuTime->idle_t,&cpuTime->iowait_t, &cpuTime->irq_t,
    		&cpuTime->softirq_t, &cpuTime->steal_t,&cpuTime->guest_t,
    		&cpuTime->guest_nice_t);
}

double evaluate_percent(struct CpuTime *cputime1, struct CpuTime *cputime2){

	double useful_time1 = cputime1->user_t + cputime1->nice_t + cputime1->system_t;
	double total_time1 = useful_time1 + cputime1->idle_t + cputime1->iowait_t + cputime1->irq_t + cputime1->softirq_t + cputime1->steal_t + cputime1->guest_t + cputime1->guest_nice_t;

	double useful_time2 = cputime2->user_t + cputime2->nice_t + cputime2->system_t;
	double total_time2 = useful_time2 + cputime2->idle_t + cputime2->iowait_t + cputime2->irq_t + cputime2->softirq_t + cputime2->steal_t + cputime2->guest_t + cputime2->guest_nice_t;

	double useful = useful_time2 - useful_time1;
	double total = total_time2 - total_time1;

	return (useful/total)*100;
}

/*
 *  evaluate cpu utility percent between two sampling time interval
 *  sampling time is in milliseconds;
 *  You change sampling time to determine the cpu usage measurement or
 *  accuracy resolution of percent.
 */
double cpu_usage() {

	int sampling_time = 500; // sampling time (interval)
    struct CpuTime *cpuTime1 = malloc(sizeof *cpuTime1);
    struct CpuTime *cpuTime2 = malloc(sizeof *cpuTime2);

    get_cpuTime(cpuTime1);

    usleep(sampling_time*1000); // time interval between two consequence cpu time sampling;

    get_cpuTime(cpuTime2);

    double percent = evaluate_percent(cpuTime1, cpuTime2);

    free(cpuTime1);
    free(cpuTime2);

    return percent;

}

void get_memInfo(struct MemInfo *memInfo) {
	int c, i;
	char num_str[20];
	int num[4];
	FILE *file;
	file = fopen("/proc/meminfo", "r");

	for(i=0; i<4; i++) {

		while ((c = getc(file)) != ' '); // read and ignore file characters until current character is not ' ' or space

		while ((c = getc(file)) == ' '); // read and ignore file characters until current character is ' ' or space

		ungetc ( c, file ); 			 // go back 1 character because now we have gone to the number first character

		int j = 0;

		while ((c = getc(file)) != ' ') // reading number
			num_str[j++] = c; 			// store number characters in num_str

		num_str[j] = '\0';				// add last character of num_str '\0' because atoi must understand end of string with '\0'

		num[i] =atoi(num_str);
	}
		memInfo->total = num[0];
		memInfo->free = num[1];
		memInfo->buffers = num[2];
		memInfo->cached = num[3];
}

double memory_usage() {

	struct MemInfo *memInfo = malloc(sizeof *memInfo);

	get_memInfo(memInfo);

    double mem_used = memInfo->total - (memInfo->free + memInfo->buffers + memInfo->cached);

    double percent = (mem_used/memInfo->total)*100.0;;

    free(memInfo);

    sleep(1);

    return percent;
}

char* read_line(FILE *fin) {
    char *buffer;
    char *tmp;
    int read_chars = 0;
    int bufsize = BUFSIZ;
    char *line = malloc(bufsize);

    if ( !line ) {
        return NULL;
    }

    buffer = line;

    while ( fgets(buffer, bufsize - read_chars, fin) ) {
        read_chars = strlen(line);

        if ( line[read_chars - 1] == '\n' ) {
            line[read_chars - 1] = '\0';
            return line;
        }

        else {
            bufsize = 2 * bufsize;
            tmp = realloc(line, bufsize);
            if ( tmp ) {
                line = tmp;
                buffer = line + read_chars;
            }
            else {
                free(line);
                return NULL;
            }
        }
    }
    return NULL;
}

void get_diskInfo(struct DiskInfo *diskInfo) {

    FILE *file;
    char *line;

    char *line_str[BUFSIZ];

    file = fopen("/proc/diskstats", "r"); // this file is same as /sys/block/sda/stat

    char c;
    int num[11];
    char num_str[20];

    int j = 0;

    while ((c = getc(file)) != 's');
    getc(file);
    getc(file);
    getc(file);
    // now file pointer is reached to sda row

    int i;

    for(i=0; i<11; i++) {
    		int j = 0;
    		while ((c = getc(file)) != ' ') // reading number
    			num_str[j++] = c; 			// store number characters in num_str

    		num_str[j] = '\0';				// add last character of num_str '\0' because atoi must understand end of string with '\0'

    		num[i] =atoi(num_str);
    	}

    diskInfo->read_sectors = num[2];
    diskInfo->read_milliseconds = num[3];
    diskInfo->write_sectors = num[6];
    diskInfo->write_milliseconds = num[7];
}


double disk_write_speed(){
	struct DiskInfo *diskInfo1 = malloc(sizeof *diskInfo1);

	get_diskInfo(diskInfo1);

	sleep(1);

	struct DiskInfo *diskInfo2 = malloc(sizeof *diskInfo2);

	get_diskInfo(diskInfo2);

	double write_MegaBytes = (diskInfo2->write_sectors - diskInfo1->write_sectors)*512/100*2;
	double write_blocks = (diskInfo2->write_sectors - diskInfo1->write_sectors);
	double time = diskInfo2->write_milliseconds - diskInfo1->write_milliseconds;
	double speed = (write_blocks/time)*1000.0;

//	free(diskInfo1);
//	free(diskInfo2);

	if(time == 0)
		return 0;
	else
		return speed;
}

double disk_read_speed(){
	struct DiskInfo *diskInfo1 = malloc(sizeof *diskInfo1);

	get_diskInfo(diskInfo1);

	sleep(1);

	struct DiskInfo *diskInfo2 = malloc(sizeof *diskInfo2);

	get_diskInfo(diskInfo2);

	double read_MegaBytes = (diskInfo2->read_sectors - diskInfo1->read_sectors)*512/100*2;
	double read_blocks = (diskInfo2->read_sectors - diskInfo1->read_sectors);
	double time = diskInfo2->read_milliseconds - diskInfo1->read_milliseconds;
	double speed = (read_blocks/time)*1000.0;

	free(diskInfo1);
	free(diskInfo2);

	if(time == 0)
		return 0;
	else
		return speed;
}

char* get_time() {

	char *return_t = malloc(20 * sizeof(char));

	time_t rawtime;
	struct tm * timeinfo;
	char time_str [80];

	time (&rawtime);
	timeinfo = localtime (&rawtime);

	strftime (time_str,80,"%H:%M:%S",timeinfo);

	strcpy(return_t, time_str);

	return return_t;
}


void test_cpu_usage() {

	while(1) {

		printf("CPU usage: %f\n"
				"Memory usage: %f\n"
				"FS Read: %f Mb/sec\n"
				"==================\n", cpu_usage(), memory_usage(), disk_read_speed());

		sleep(1);
	}
}






		

client.c


#include "stdio.h"
#include "unistd.h"
#include "pthread.h"
#include "sys/types.h"
#include "unistd.h"
#include "string.h"
#include "sys/socket.h"
#include "netinet/in.h"
#include "netdb.h"

#include "xml.h"
#include "Metrics.h"

const char* CLIENT_NAME;		// Client Name would give from command line second input argument
const char* SERVER_IP;		    // Client Name would give from command line first input argument

int fd_cpu[2], fd_mem[2], fd_disk_read[2], fd_disk_write[2]; // pipes file descriptors

int sockfd, portno;
struct sockaddr_in serv_addr;
struct hostent *server;

void write_pipe(char *name, int content, int *fd){

	struct Metric* metric = create_metric(CLIENT_NAME, name, content, get_time());

	char *write_msg = xml_maker(metric);

	write(fd[1], write_msg, strlen(write_msg)+1);
}

struct Metric* read_pipe(int *fd){

	char read_msg[BUFSIZ];

	read(fd[0], read_msg, BUFSIZ);

	write(sockfd, read_msg, strlen(read_msg)); // Send to Server !

	return xml_parser(read_msg);
}

void *thread_cpu(void *param) {

	while(1) {

		write_pipe("cpu", cpu_usage(), fd_cpu);

		sleep(1);
	}
}

void *thread_mem(void *param) {

	while(1) {

		write_pipe("mem", memory_usage(), fd_mem);

		sleep(1);
	}
}

void *thread_disk_read(void *param) {

	while(1) {

		write_pipe("fsr", disk_read_speed(), fd_disk_read);

		sleep(1);
	}
}

void *thread_disk_write(void *param) {

	while(1) {

		write_pipe("fsw", disk_write_speed(), fd_disk_write);

		sleep(1);
	}
}

void process1(){

	pthread_t tid_cpu, tid_mem, tid_disk_read, tid_disk_write; /* the thread identifier */
	pthread_attr_t attr_cpu, attr_mem, attr_disk_read, attr_disk_write; /* set of attributes for the thread */

	pthread_attr_init(&attr_cpu);
	pthread_create(&tid_cpu, &attr_cpu, thread_cpu, NULL);

	pthread_attr_init(&attr_mem);
	pthread_create(&tid_mem, &attr_mem, thread_mem, NULL);

	pthread_attr_init(&attr_disk_read);
	pthread_create(&tid_disk_read, &attr_disk_read, thread_disk_read, NULL);

	pthread_attr_init(&attr_disk_write);
	pthread_create(&tid_disk_write, &attr_disk_write, thread_disk_write, NULL);

	pthread_join(tid_cpu, NULL);
	pthread_join(tid_mem, NULL);
	pthread_join(tid_disk_read, NULL);
	pthread_join(tid_disk_write, NULL);

}

void process2(){

	while(1) {

		struct Metric *metric_cpu = read_pipe(fd_cpu);
		struct Metric *metric_mem = read_pipe(fd_mem);
		struct Metric *metric_disk_r = read_pipe(fd_disk_read);
		struct Metric *metric_disk_w = read_pipe(fd_disk_write);

		time_t rawtime;
		struct tm * timeinfo;
		char time_str [80];

		time (&rawtime);
		timeinfo = localtime (&rawtime);

		strftime (time_str,80,"%H:%M:%S",timeinfo);

		printf( "at %s\n"
				"CPU usage: %d%\n"
				"Memory usage: %d%\n"
				"FS write: %d B/sec\n" // Block per Seconds
				"FS read: %d B/sec\n"  // Block per Seconds
				"=====================\n",
				metric_cpu->time,
				metric_cpu->content,
				metric_mem->content,
				metric_disk_w->content,
				metric_disk_r->content
		);

		free(metric_cpu);
		free(metric_disk_r);
		free(metric_disk_w);
		free(metric_mem);

		sleep(5); // !TODO: sleep(5)
	}
}

void main(int argc, char *argv[])
{

	printf("\n\n\n\n");

	printf("Author: Soheil Shababi - 92213117\n");	
	printf("OS Projcet - Phase 2\n");
	printf("Monitoring Distributer System - Version 2.00\n");

	printf("\n\n\n\n");

	sleep(2);

	// Client Initializing :

	SERVER_IP = argv[1];
	CLIENT_NAME = argv[2];

    portno = 12345;
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
        error("ERROR opening socket");

//    server = gethostbyname("localhost");

    struct in_addr addr;
    inet_aton(SERVER_IP, &addr);
    server = gethostbyaddr(&addr, sizeof(addr), AF_INET);

    if (server == NULL) {
        fprintf(stderr,"ERROR, no such host\n");
        exit(0);
    }
    bzero((char *) &serv_addr, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    bcopy((char *)server->h_addr,
         (char *)&serv_addr.sin_addr.s_addr,
         server->h_length);
    serv_addr.sin_port = htons(portno);
    if (connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0)
        error("ERROR connecting");

    // write(sockfd, CLIENT_NAME, strlen(CLIENT_NAME));// FIRST PACKET SEND NAME OF CLIENT !

    /// Process Creating :

    pid_t pid;

    pipe(fd_cpu);
    pipe(fd_mem);
    pipe(fd_disk_read);
    pipe(fd_disk_write);

	if ((pid =fork()) == 0) { // child 1
		process1();
	}
	else if(pid > 0) {
		if ((pid = fork()) == 0) { // child 2
			process2();
		}
		else if (pid > 0) { // parent ( main process, root process)
				//printf("Im parent\n");
		}
	}

  }








		

Shared_Mem.c



#include "Shared_Mem.h"


const int SIZE = 500;

struct Shm_Protect*  create_shm(char* name) {

	struct Shm_Protect *shared_mem = malloc(sizeof *shared_mem);

	shared_mem->name = name;
	//create the shared memory segment
	int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
	//configure the size of the shared memory segment
	ftruncate(shm_fd,SIZE);

	//map the shared memory segment in the address space of the process
	shared_mem->first = shared_mem->curr_w = shared_mem->curr_r = mmap(0,SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);

    sem_init(&shared_mem->mutex,0,1);
    sem_init(&shared_mem->empty,0,SIZE);
    sem_init(&shared_mem->full,0,0);

	return shared_mem;
}

void write_shm(char* msg, struct Shm_Protect* shared_mem) {

	sem_wait(&shared_mem->empty);
	sem_wait(&shared_mem->mutex);

	sprintf(shared_mem->curr_w,"%s",msg);

	shared_mem->curr_w =  ((int*)shared_mem->curr_w + strlen(msg)); // For Circular Queue

	if(shared_mem->curr_w > shared_mem->first + SIZE)
		shared_mem->curr_w = shared_mem->first;

	sem_post(&shared_mem->mutex);
	sem_post(&shared_mem->full);
}

char* read_shm(struct Shm_Protect* shared_mem) {

	char *return_val = malloc(4096 * sizeof(char));

	sem_wait(&shared_mem->full);
	sem_wait(&shared_mem->mutex);

	strcpy(return_val, shared_mem->curr_r);

	shared_mem->curr_r = ((int*)shared_mem->curr_r + strlen(return_val))  ; // For Circular Queue

	if(shared_mem->curr_r > shared_mem->first + SIZE)
		shared_mem->curr_r = shared_mem->first;

	sem_post(&shared_mem->mutex);
	sem_post(&shared_mem->empty);

	return return_val;
}

//int main()
//{
//
//	while(1) {
//	struct Shm_Protect* shared_mem = create_shm("sohiel asdf!");
//
//	char buffer[245];
//	bzero(buffer,245);
//	printf("Write msg: ");
//	fgets(buffer,255,stdin);
//
//	write_shm(buffer, shared_mem);
//
//	printf("Read  msg: %s\n", read_shm(shared_mem));
//
//
//	}
//
//	return 0;
//}




		

xml.c



/*
 * xml.c

 *
 *  Created on: Jan 13, 2016
 *      Author: cl
 */

#include "xml.h"

struct Metric* xml_parser(char *xml) {

	struct Metric *metric = malloc(sizeof *metric);
	char *start, *end;

	//finding strstr(xml, "");
	start = strchr(xml, '>');
	start++;
	end = strchr(start, '<');
	strcpy(end, "\0");
	sscanf(start,"%s", metric->CLIENT_N);      // prints xx

	xml = strchr(end+1, '<');

	//finding strstr(xml, "");
	start = strchr(xml, '>');
	start++;
	end = strchr(start, '<');
	strcpy(end, "\0");
	sscanf(start,"%s", metric->name);      // prints xx

	xml = strchr(end+1, '<');

	//finding strstr(xml, "");
	start = strchr(xml, '>');
	start++;
	end = strchr(start, '<');
	strcpy(end, "\0");
	sscanf(start,"%d", &metric->content);

	xml = strchr(end+1, '<');// prints xx

	//finding strstr(xml, "");
	start = strchr(xml, '>');
	start++;
	end = strchr(start, '<');
	strcpy(end, "\0");
	sscanf(start,"%s", &metric->time);

	xml = strchr(end+1, '<');// prints xx

	return metric;
}

char* xml_maker(struct Metric* metric) {

	char *str = malloc(500 * sizeof(char));

	sprintf(str, "%s%s%d"
			"%s",
			metric->CLIENT_N, metric->name, metric->content,  metric->time);

	return str;
}

struct Metric* create_metric(char* CLIENT_I, char* name_i, int content_i, char* time_i) { // _i: means input parameter

	struct Metric *metric = malloc(sizeof *metric);

	strcpy(metric->CLIENT_N, CLIENT_I);
	strcpy(metric->name, name_i);
	metric->content = content_i;
	strcpy(metric->time, time_i);

	return metric;
}