// Author: Leslie S. Liu (shihuali@usc.edu) // Written on April 22, 2003 // IMSC , University of Southern California // File: pp-app.cc // Written for : ns-2.26 // Last Updated: Arpril 24, 2003 #include "random.h" #include "pp-app.h" #include "simulator.h" // PPApp OTcl linkage class static class PPAppClass : public TclClass { public: PPAppClass() : TclClass("Application/PPApp") {} TclObject* create(int, const char*const*) { return (new PPApp); } } class_app_pp; // When snd_timer_ expires call PPApp:send_voice_pkt() // to send out the audio stream void SendVoiceTimer::expire(Event*) { t_->send_voice_pkt(); //printf("[DEBUG] Timer snd_timer is invoked at time (%f) from node <%d> \n",Scheduler::instance().clock(),t_); } // When check_timer_ expires call PPApp:send_check_pkt() // to check the critical connection status // This depends on the catergory of the nodes void CheckTimer::expire(Event*) { //t_->send_check_pkt(); } // Constructor (also initialize instances of timers) PPApp::PPApp() : running_(0), talking_(0),joined_(0),snd_timer_(this), chk_timer_(this) { //this part will export internal variables //to NS interpreter for setting bind_bw("rate0_", &rate[0]); bind_bw("rate1_", &rate[1]); bind_bw("rate2_", &rate[2]); bind_bw("rate3_", &rate[3]); bind_bw("rate4_", &rate[4]); bind("pktsize_", &pktsize_); bind("RP",&RP); //must set if this node is Rendesvous Point bind_bool("random_", &random_); //set internal paramters for(int i=0;isupportPP()) { agent_->enablePP(); } else { printf("agent \"%s\" does not support PP Application", argv[2]); return(TCL_ERROR); } agent_->attachApp(this); return(TCL_OK); } } return (Application::command(argc, argv)); } //tool function to print out current peers of this node void PPApp::print_peer() { printf("[Info] >>>>Peer List at Node[ %d ]<<<< \n\n",this); for(int i=0; i< MAX_DEGREE;i++) { if((int)peerlist[i]!=-1) printf(" =>Node at addr_(%d) \n",peerlist[i]); } printf(" \n\n"); } void PPApp::init() { // PP sequence number (start from 0) seq_ = 0; //following calculate the time needed to send out one packet interval_ = (double)(pktsize_ << 3)/(double)rate[scale_]; } //Start the application //Currently not sure if necessary to call it void PPApp::start() { printf("[Info] << Application at node (%d) starts running >> \n",this); init(); running_ = 1; } void PPApp::stop() { printf("[Info] << Application at node (%d) stopped >> \n",this); running_ = 0; } //Call to start the join process with the agent //that connected with current agent void PPApp::join() { hdr_pp ph_buf; printf("[Info] The JOIN request is made by node ( %d )\n",this); if(running_) { ph_buf.join=1 ; //this is a join packet ph_buf.src_addr=agent_->addr(); //save local addres as source addr of //the packet strcpy(ph_buf.msg_data,"This is a sweet join packet") ; //the content of join packet is dummy ph_buf.nbytes=pktsize_; //set the total size need to transfer ph_buf.time=Scheduler::instance().clock(); //get current time //printf("Sending out join call with data: %s\n",ph_buf.p_data); agent_->sendmsg(pktsize_, (char*) &ph_buf); //send out the packet to agent } else { printf("[Warning] JOIN ignored :: Application at node( %d ) is not running, start it first \n", this); } } // Leave function call void PPApp::leave() { hdr_pp ph_buf; //notify that this node is leaving the group if(joined_) { printf("[Info] LEAVE requst made at(%d)\n",agent_->addr()); ph_buf.leave=1; //this is a leave packet ph_buf.src_addr=agent_->addr(); //save local addres as source addr of //the packet strcpy(ph_buf.msg_data,"This is a sad leave packet") ; //the content of join packet is dummy ph_buf.nbytes=pktsize_; //set the total size need to transfer ph_buf.time=Scheduler::instance().clock(); //get current time agent_->sendto(pktsize_, (char*) &ph_buf,parent_node); //send out the packet to parent node if(peer_n>0) //if this node has children node { for(int i=0;isendto(pktsize_,(char*) &ph_buf,peerlist[i]); } } } //leave the group, but the program is still running //so for a graceful leave, we do NOT disable the node now //we will wait for the confirmation back then finally leve //this group } // Talk function call void PPApp::talk(double t_time) { printf("[Info] TALK command is made at node( %d ), talking from %f and last %f\n",this, Scheduler::instance().clock(),t_time); if(running_ &&joined_) //the node is part of the peer group { //set status to talking talking_=1; t_timeover=t_time+Scheduler::instance().clock();//time to stop talking send_voice_pkt(); } else { printf("[Warning] Node( %d ) has not joined the peer to peer group, TALK command ignored. \n", this); } } // Send application data packet void PPApp::send_voice_pkt() { hdr_pp ph_buf; //printf("[DEBUG] PPApp::send_voice_pkt() : localtime = %f , time to stop %f\n",Scheduler::instance().clock(),t_timeover); if (running_ && talking_ &&joined_&& t_timeover>=Scheduler::instance().clock()) { // the below info is passed to UDPpp agent, which will write it // to PP header after packet creation. ph_buf.v_data = 1; // This is a voice stream packet ph_buf.seq = seq_++; // PP sequece number ph_buf.nbytes = pktsize_; // Size of PP packet (NOT UDP packet size) ph_buf.time = Scheduler::instance().clock(); // Current time ph_buf.scale = scale_; // Current scale value ph_buf.src_addr=agent_->addr(); //set local addr of the packet //load the voice packet with dummy data strcpy(ph_buf.msg_data,"::TALKING::DATA::"); for(int i=0; isendto(pktsize_, (char*) &ph_buf,peerlist[i]); // send to UDP } // Reschedule the send_pkt timer //double next_time_ = next_snd_time(); //if(next_time_ > 0) snd_timer_.resched(next_time_); //printf("[DEBUG] Current Time %f , and schedule snd_packet at %f later\n",Scheduler::instance().clock(),interval_); //printf("[DEBUG] Status of Send Timer %d\n", snd_timer_.status()); //printf("[DEBUG] Reschedule timer to wake up in %f in the future\n",interval_); snd_timer_.resched(interval_); } else { //talking is over printf("[Info] Talking at Node( %d ) is stopped at time ( %f ) \n", this,Scheduler::instance().clock()); talking_=0; t_timeover=0; } } // Schedule next data packet transmission time // This function is only useful when we need to dynamically // ajust the trafer rate. Currently it is just a dummy double PPApp::next_snd_time() { // Recompute interval in case rate or size chages interval_ = (double)(pktsize_ << 3)/(double)rate[scale_]; double next_time_ = interval_; if(random_) next_time_ += interval_ * Random::uniform(-0.5, 0.5); return next_time_; } // Receive message from underlying agent void PPApp::recv_msg(int nbytes, const char *msg) { if(msg&&running_)//if the application is runnning { hdr_pp* ph_buf = (hdr_pp*) msg; //incoming hdr hdr_pp ack; //response ack hdr double local_time=Scheduler::instance().clock(); //get local time int t; //if received join request packet if (ph_buf->join == 1) { printf("[Info] JOIN request is received by node ( %d )\n",this); if(RP) //if the applicaton is running as Rendezvous Point { if(peer_n<=0) //no core in the RP so far { t=next_slot(); peerlist[t]=ph_buf->src_addr; //add this node to core list peer_n++; printf("[Info] RP(%d) has added node at addr_(%d) as core \n",this, ph_buf->src_addr); //set up an ack packet to ask the new core ack.ack_join=1; //this is ack to join strcpy(ack.msg_data,"JOIN_ACK_CREATE_CORE"); //loaded the control msg ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet size ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); //finally we can send out the packet agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent } else //else then ask the node to join to an existing core { //Select one core for use //currently we are only working as single core system //so this will always be the peerlist[0] //But more interesting thing can be done in future int select_core=0; //packet refer the node to a core ack.ack_join=1; //this is ack to join strcpy(ack.msg_data,"JOIN_ACK_REFER"); //loaded the control msg ack.ref_addr=peerlist[select_core]; //which core to join ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet size ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); //finally we can send out the packet agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent } } if(RP!=1) // For all other nodes (Core.. first peer) the join process is same now. { //delay is an important decidsion factor and need to calculate //by application. However, in simulation, all application share //a global time, thus make this job easy. double delay_time=local_time-ph_buf->time; if(delay_time <= MAX_DELAY) //ok, within service range { //check if the core can accept one more peer if((t=next_slot())>=0) { peerlist[t]=ph_buf->src_addr; //add this node to children peer_n++; printf("[Info] Node(%d) has added node at(%d) as children \n",this, ph_buf->src_addr); //set up an ack packet to info the new node ack.ack_join=1; //this is ack to join strcpy(ack.msg_data,"JOIN_ACK_SUCCESSFUL"); //loaded the control msg ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet size ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); //finally we can send out the packet agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent print_peer(); } else { //select a exiting peer to refer the join request int select_node = Random::random()%MAX_DEGREE; //randomly select a node //packet refer the node to a core ack.ack_join=1; //this is ack to join strcpy(ack.msg_data,"JOIN_ACK_REFER"); //loaded the control msg ack.ref_addr=peerlist[select_node]; //which node refer to ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet size ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); //finally we can send out the packet agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent printf("[Warning] Node(%d) if full, JOIN referred.\n",this); } } else //this part is simple and need update.. right now, if the node has no child , join fail { int select_node; //select a exiting peer to refer the join request if(peer_n >0) // join can be done. { select_node = Random::random()% MAX_DEGREE; //randomly select a node //packet refer the node to a core ack.ack_join=1; //this is ack to join strcpy(ack.msg_data,"JOIN_ACK_REFER"); //loaded the control msg ack.ref_addr=peerlist[select_node]; //which node refer to ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet size ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); //finally we can send out the packet agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent printf("[Warning] Node at(%d) is too far from Node(%d), JOIN referred.\n",ph_buf->src_addr,this); } else //no node to join { //packet refer the node to a core ack.ack_join=1; //this is ack to join strcpy(ack.msg_data,"JOIN_ACK_FAIL"); //loaded the control msg ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet size ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent //printf("[Warning] Node at(%d) is too far from Node(%d), JOIN referred.\n",ph_buf->src_addr,this); } } }//end of if RP!=1 }//end of Join processing //forwarding all data stream packet if(ph_buf->v_data==1 && joined_) // Now it is a voice data packet { //printf("[DEBUG] PPApp::recv_msg() get a voic data packet at node addr_( %d )\n", agent_->addr()); //print_peer(); //Set ack packet type to v_data ack.v_data=1; //we send just forward the voice packet to all nodes. for(int i=0;isrc_addr!=peerlist[i])&&(int)peerlist[i]!=-1) { agent_->sendto(pktsize_,(char*) &ack, peerlist[i]); printf("[DEBUG] Forwarding voice packet from addr_( %d ) to addr_( %d )\n",agent_->addr(),peerlist[i]); } } } //if received ack packet for JOIN request if(ph_buf->ack_join == 1) { hdr_pp join_hdr; printf("[DATA] Received ACK packet with data( %s )\n",ph_buf->msg_data); // check up the ACK packet type if(strcmp(ph_buf->msg_data,"JOIN_ACK_CREATE_CORE")==0) //create a new core { core=1; //set this node to core joined_=1; //set itself the guy in group printf("[Info] Node( %d ) at(%d) has been set as core \n",this,agent_->addr()); } else if(strcmp(ph_buf->msg_data,"JOIN_ACK_REFER")==0) //create a new core { //first save the source node as backup node bk_node=ph_buf->src_addr; //send join packet to referred node join_hdr.join=1 ; //this is a join packet join_hdr.src_addr=agent_->addr(); //save local addres as source addr of //the packet strcpy(join_hdr.msg_data,"This is a sweet join packet") ; //the content of join packet is dummy join_hdr.nbytes=pktsize_; //set the total size need to transfer join_hdr.time=Scheduler::instance().clock(); //get current time //printf("Sending out join call with data: %s\n",ph_buf.p_data); agent_->sendto(pktsize_, (char*) &join_hdr, ph_buf->ref_addr); //send out the packet to agent } else if(strcmp(ph_buf->msg_data,"JOIN_ACK_SUCCESSFUL")==0) // Join successful { //setup parent node parent_node=ph_buf->src_addr; //set the nodes status to joined joined_=1; //since the node just join, there must be empty slot peerlist[next_slot()]=ph_buf->src_addr; //add parents to peer list peer_n++; //increase the peer number print_peer(); } else if(strcmp(ph_buf->msg_data,"JOIN_ACK_FAIL")==0) // Join failed { joined_=0; //redundant line, just for safty printf("[ERROR] JOIN request by node (%d) at(%d) is failed \n",this,agent_->addr()); } else printf("[Warning] Unknown ACK_JOIN packet type\n"); }//end of if join_ack //if this is a leave packet if (ph_buf->leave==1) { printf("[Info] Node@< %d > received leave request from Node@< %d >\n",agent_->addr(),ph_buf->src_addr); for(int i=0;isrc_addr) { printf("[Info] LEAVE:Node@<%d> remove Node@<%d> from peerlist\n",agent_->addr(),ph_buf->src_addr); peerlist[i]=-1; peer_n--; //send comfirmation back ack.ack_leave=1; //this is ack to join strcpy(ack.msg_data,"LEAVE_ACK_SUCCESSFUL"); //loaded the control msg ack.src_addr=agent_->addr(); //set the local address ack.nbytes= pktsize_; //packet sizei ack.seq=0; //do not know if necessary to set to 0 or not ack.time=Scheduler::instance().clock(); agent_->sendto(pktsize_, (char*) &ack,ph_buf->src_addr); //send out the packet to agent //if the node leaving is the parent.. this node need to find a new parent if(ph_buf->src_addr==parent_node) { //right now.. just simply join again join(); } } } }//end of leave //if this is a ack to leave request if (ph_buf->ack_leave==1) { if(strcmp(ph_buf->msg_data,"LEAVE_ACK_SUCCESSFUL")==0) //Successful leave ack { peer_n--; //reduce peer number //note that we have a very weak check here to see if all peers are left if(peer_n ==0) //all peers are left { joined_=0; talking_=0; for(int i=0;iaddr()); } } }//end of ack_leave }//end of if running & msg else { printf("[Warning] Received packet but application not running (%d)\n",this->running_); } }//end of function // Sender sets its scale to what reciver notifies void PPApp::set_scale(const hdr_pp *ph_buf) { scale_ = ph_buf->scale; } void PPApp::account_recv_pkt(const hdr_pp *ph_buf) { double local_time = Scheduler::instance().clock(); // Calculate RTT if(ph_buf->seq == 0) { init_recv_pkt_accounting(); p_accnt.rtt = 2*(local_time - ph_buf->time); } else p_accnt.rtt = 0.9 * p_accnt.rtt + 0.1 * 2*(local_time - ph_buf->time); // Count Received packets and Calculate Packet Loss p_accnt.recv_pkts ++; p_accnt.lost_pkts += (ph_buf->seq - p_accnt.last_seq - 1); p_accnt.last_seq = ph_buf->seq; } void PPApp::init_recv_pkt_accounting() { p_accnt.last_seq = -1; p_accnt.last_scale = 0; p_accnt.lost_pkts = 0; p_accnt.recv_pkts = 0; } void PPApp::send_check_pkt(void) { double local_time = Scheduler::instance().clock(); adjust_scale(); // send ack message hdr_pp ack_buf; ack_buf.ack_check = 1; // this packet is ack packet ack_buf.time = local_time; ack_buf.nbytes = 40; // Ack packet size is 40 Bytes ack_buf.scale = p_accnt.last_scale; agent_->sendmsg(ack_buf.nbytes, (char*) &ack_buf); // schedul next ACK time chk_timer_.resched(p_accnt.rtt); } void PPApp::adjust_scale(void) { if(p_accnt.recv_pkts > 0) { if(p_accnt.lost_pkts > 0) p_accnt.last_scale = (int)(p_accnt.last_scale / 2); else { p_accnt.last_scale++; if(p_accnt.last_scale > 4) p_accnt.last_scale = 4; } } p_accnt.recv_pkts = 0; p_accnt.lost_pkts = 0; }