From 45b24cbd4251931fc77fa1171cb1303f1d31deff Mon Sep 17 00:00:00 2001 From: Kevin Date: Thu, 6 Feb 2025 20:33:43 -0500 Subject: [PATCH 1/2] add existing code for kmeans --- .../code/figs/minibatch.png | Bin 0 -> 20640 bytes .../app/config/config_fed_client.json | 32 ++ .../app/config/config_fed_server.json | 56 ++++ .../app/custom/kmeans_assembler.py | 75 +++++ .../app/custom/kmeans_learner.py | 116 +++++++ .../code/jobs/sklearn_kmeans_base/meta.json | 9 + .../code/kmeans_fl_job.py | 52 ++++ .../code/requirements.txt | 5 + .../code/src/kmeans_assembler.py | 75 +++++ .../code/src/kmeans_learner.py | 116 +++++++ .../code/utils/prepare_data.py | 84 +++++ .../code/utils/prepare_job_config.py | 293 ++++++++++++++++++ .../convert_kmeans_to_fl.ipynb | 193 +++++++++++- 13 files changed, 1102 insertions(+), 4 deletions(-) create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/figs/minibatch.png create mode 100755 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_client.json create mode 100755 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_server.json create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_assembler.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/figs/minibatch.png b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/figs/minibatch.png new file mode 100644 index 0000000000000000000000000000000000000000..6c18b663a3b6481c28e00ad96a3a1160fec5fe93 GIT binary patch literal 20640 zcmb5W1yohhw?2xEBB6pP2#BCGh?KM*2?6Pj0|wpQrGlVHNH-!SUD7E?cQ;7Kp}T}P z56bV}H{QMD|9@vVHpo7E?X~8bYtC=Y`OWoFN3egn|3=C|M=K?Yq80VQVFwXqM zJPUuRO}NMpKh9YoMC33rF^9$_f5U(AECrP;Wlgj#ZC;scVaV!QSz2nDzj@bw83W@k zhKRsZIorX-5u2CzT2*yxIu1nhlxJk>KirMIQAx>q=Gl#!XFSt*@0I+hX)$l&tFPj3 zNl0aX6%L?%iZ3OAcTfB|X6%Ktv6SaKIm$vtqM~+ugV?^+(P*BV*xBf7Ud%0NFzLO_ zucJ-oPTz#?Ax3zVKPLKTyU>4KUD?sM{$4SDTtnX|FeU!z<8!pxB}r_eob~c&{TGVk z&lNgCLPBzKfj?iq3^@m1a(TyYPPewP;c&QC$IHv>j+d+5V!x1_)V~8aE4>qrybhyV zTU&j3w{IN~!=DQ6)DDWigolTRvFKQ@jFzbu7&Db^%7}}5Tg9noY7|?}N7;-H#6I%~ z4GpcTstQrJPqvtCm6nz^Gc(IiVp1V zdOSpTPfyu;Bi%@`6>6wprOF-azQ}b%>E2940F`7kw~cXM1~Ts3xAMbvu^}%ejdC7Z zT3Q|+$A#|X>FH^NXIGLWYHL3yC&zfC*y(6*cBsIl54Bb2O9odxJ-uF?vb9>`IAPxz zmNPJx;YA}&coIHziWzFzuWLTupwey$*K(ld<2xF$8AHNoiY#V7T)o|yBA?1sacI8S zpPj4I#z;+#hoD!@U+j|PkK(pj<`{FFoS)}3n-B+)vG1qKUiQ=;jm-uRdfCJ?IOoLbJX_!{(dyCgYj^obnvvNC+5)h;^N}( z-@i9EHy0NT%hXLC-bx!HWKut@JcnDK!(%d93ZofP7s%{W9V>7-J{oh}@U}Vtu@2@L z=sXIOh~gr+b4N`@g|=xuHM5+EnAm_M%XA!pGO=8nsDXCV(&%r)P@d%OlI?i&tk+Y# zsq@K8citY*sq-PZfrw8?;N;|VcXwAVuvs1+`1y1EaBri`Vdo~|yBsabnvjqX7uPqY zwTe0j2M5Q>%1TR1>*D)Z#Z2|yC9bFpgT9Q})~M9fR62V4#lgH(s}hwzU2WP+Jnt{z z=4XVrXq%XrpmN~;l3<{ymbkb$JT4*8D#ldCYBBS|pwFL@hM)hq;?FyqF{hn_l?sq! zuEWl9YZUjT^XH|cK%-n|*7yfKLqZgvKle21QDFF?k}i{M==iSYb@naq+OdsbmGpeG zH`fr){WV#Mf8q)!X=`h<_YuVGXm9rm z4+OqP?Toop&^LKsvbD7Z9f;zw>ojD?A{X+xV>j{Z zBj`=O(a`!^=pjaUOI{ntdZx=CR9>ID!(6OKxBiyrH3}k^q~7Dr)oNTt1%>kch18&+ zAW$3i0^_LM+}yn9@D|Ak_LiZc16XGL1tu|DRq1;hvlZKe24MI?w=Vi8%OOXY^E2C4i;G?i6#)S z>b9pT=MImIEbaR^?Qfdd*c{Kr`EJ_t#$1F4erFDyc#cCW>W}DmRf_is^`@(agoWAh zW*W0cY}q?Kefrj@DU#D-)N!LFg59UHvy;zhpOuw0^hF}rlffAt^ov)i3@BWbEUfL&jfxKlUBtN)8XUZRxY3rWk|&D!B{cN0?dtF4krK&p*5u-1 z6-;SW)q`P+HY-%`QmhB5{bF`qTD6cjQIMbCs>5i%A$!HqJ_-!z%2>tmCjz=O;uy#E zhJjoiTw?Y#*(3=N$WXqKs-~u`=FA5vcXoe>+G`A)4Wx65{RcD>$Y&bzPL zqIo^8-hO1zo0b%9FN#Sn(n)Mm+5r>x^Zg~X3&S*99xi%&?$Xi0*m1rOkHt*02@V|- zlgsMQOI@9vFbZAl@!hpaWQn!7h)DKPBMFyfeM7^z1=@f1XL~OWn)4BFo@RM3i!I#NzGN{N&NrC?HH-|D$LR-B$Es>nLVj+wbv)+UZDwgY0 z4KPp;lRm6LP=Xv|ez0)hb->d#BNG&?xgp7OjhNVBcXfPq zb+x}=NjZ22B&FYz!UiK%D_Y;2@9HH~&e0Un*I)4EKYHwPbazc;o;%- z$`}~|!B&Sb*`I<}2+ypnEMA9grDt=aWd~oMzPDZ;rg3|0`uhnOvx*A-7_|4V^f8KO zrlLynCgy z6LXl19yYVI!74s~=@S3`T%3l}R*==fcj-7sC#UMFD)16E$A5N8ii@}BI;y+6SU5Or zV1j=B{7E6~0~#iq_@beqK}1s0*VmVfMf-)4k`k&n{dl{eLRmwzD^VO4N=8P;`1p7f zmlZfmKfJ1@Cb|a?YCAg2+P^-9m2_;>p9$uRhmUVmMFRYKl?Qq~=w7cd*nTD~TnDzO z4W=bOKR+!k4c?6K?Cg}`=dT#s*mQF{KUB|YF{4EI$;)f#hu$0H_Rh{&j#g6;f0#;o zhi+OEZBXOuY*yz_q6shs^6Kj9060J~3EJ3hU4@QI(*_|ha|ayzkTw@CTu@H3B}7mM ziHi6;TlAY89oh_=)xmt_=HzTIj~vV=#jMAkEbUD1_{7B47=D-P57^Gm&P^dq1?nL^ zIa*@xf3^e2GH{qwNLQJjomHK^)7;Row!Xg4D>;yF6k(<8AAfl0$i$Qx%%FPn=1q0x z&bqqUrGhczVHt2(J1D}#tt`+24O|Oz^95L6-vqrhm0=mMkP(4IzKWU2+qa2XMP}Zw+A$n*LwR8*B`jBvga!)sf zpcjBfh4V8H94ElA#ub08a|h_%LRt0veSO(^&uj^%L=G8|0I zET7Nb-q_gKB!KNKEy3Y$Z*Bc)AuMftn5Xi`*X3~X=wPM5IYI&Mi6(sWh7MzhKPPWm z&*SZSovpM|n6(`YmTwL`Z4r6JdrA@7AW_UUcm}Xc1z=V2qnqFy-e1JtU#lbQ(P#TN zJJgwbVF0Mf99Q2`ihTU|5g=@t{Z=t~MJ2ec7{Kof3k$=;!v7rC$ZW(_>(NF}VkW};@`sWj3^i2G{Vho<_P5xg0*By#t!jbIp=HA|3 z{xkp)uthi|Tv@Ua0J_9sTvAea(!YDH=R5BMB-7E+`G(

H3W8==;RP1YpRKaHDwn zR0VHEVj?0i90SuWiQr9wwI0=JjK?icPEJltAn=)je` zWsv++SDIF|^qjuqIhPFTSy{~450fV&9L`A*6V0)%;od#}xKF{BK!*;gqQuA_ult>X z?4-->zZ!FMyEvR)aali!2pLyarg6(sPr9%7<5SZcdK`)Kf+uqrE+j3qUn7o6xFJ$} z==H5w)S}2Ol-Dsu-eZD*H$uFvxgv$2#Iy%s}!_~lS{`b$rl?sJ;l8#$BL0el0 zOfS4E302+f%|?sySq{s2-iv#~ST%9A@tG>Gn=9fwIk|S`y8e8r!}L3m;34zv5*xym zeF?Lf?}?8Ukz+3z$VKyt$$zA_x#=L5Z|z33t>g4MwAs;7P*nHz0V;OCrFVj0SFDWd zw*8T+^0N}UqF3aE;_wjH5y?H>9$Qx;58t`$kO~O=m4nP6M?~X}V`c+S0)1;=XhMvr z=lf^R`uLWTTGsW6UkIG2(pQ=U=j@F2C#CEgKfc9Fy~)NzO)IrBV)I_KjMz#Hh#JijxBjN?Z!sSOednfbjn%FHR8*sBAal4(=XxmO& z;dKP1fUUjQA0eUUNU_uR$=|*@TcI*$Iwg!W{*dtPg@awmVwWhht!3t{`rUhfCpxe( zC%3cWQ*Qj7?kR1h6k0Zf5=;wu!DTt$Tf|4U^rxNDd~C5j(WgQ@SXlV(-Pxg=8X9AlZ#;mVkt1v#^1YdvnZbE3 zvViVIJYA^8?zuD}e>*8qa8G5(a-NZasOplXgH?0@U01?nZQhr_z(8@>4Mg*nWM!?o z6RIoaXnvGQ{=tV&KtMoBYK#n~)SGDrr*92d&Yx15Lt9hxS6v+zIqV6Dh=`zHx+BZe z(=|8%4-0?ZpYm!;-L6Yz>B6tM$ObQ%F{A(s$D-YAYH4}(%9RBOQOL;1KtyJXJW3*| zYd)-j~{Zld;iV6?$?3GqY?;k@E6# zglAP1BCexQyM>5@qX5DXdU-0=x6oVfKhoau&{7jFX|$_ebAqq2+NgGUJecp2qN6OH$ixuET=dW>@D=h$?1Y=FIi)WrLmyscaD~z1LwrrzR&YYFr^&w105GQH~VA%myJt)L<;%{^)S8 zyD87cyYDl1{(Qkl5zX*ED6=j zQ-r@0YuvX_uu4Q+y7b0WI7T?S1NYW*0o{w+`V_c)yT4)M6A%y}CMs&bIrpkefu5b+ z8a99s?)t;Rq=&*v{*3=R&y2>90k9kR+8s5!@f)9_fx zZaK90uBui9<48jh`48f*gt$E#P38QtRG0_u$B+AsiN0tRnGNhlfu2Crij}CP#{*C= zfF#)KxknA6Wrlt6+4JX^fE$9peEF&81bP%RvbRfeza*cgjYN?+hhZk>ft_r=DcgrF zeSV|hSq<%COPu+ZwwDiwT}%4YGy1#wqy*jYjE!GRLElUnI`bWoh3$6oxB9Qy;vvi9M$4QJ-_3VEMLUuw z0o;3gDGP4M!R9XWdC9H>I()?TdhX`!{}{ZFVcxX?28z8@D?QqY4butvVOJRV(u2+k zGb6c9jIi!&HZ3ikirv!jRX${|GFU9yyum3{=IynYzI6|7T>JYaQG%kPdE&kZACh*h zm|QtU1Gg;9*iJq1@~0AUZ@7F(Bn3{4@K+iFt#}>dBZMgDoL>ZaUvCOio1Cuxjfo>) zDq~_hrfYA}CfzkLFvk9oL6hPq*@p08EPlz5<*9DG{!yx#0l%m3Juy$5A8SI|Gm*Wa z0#8|y@ONo3Nn$p7(wQsxt5aBK#>WNDpZKD1nc7j_8h*#r$4&x9^$KD4*ScP~FJmt` zxJfuAB$Sh!zRuFQZl|UsKc)4KJi&A-qhtspPnkD~$;x&kaVsh5e;OMM&TQH-?2F_# zj$lNBP1(EFm>*4ca$0z z2=YWq@@^-C_X#4(8|c&M&=JVgWAexSC&t>r!7ej{1^?|i#^@uM^z`0-+UFLtV^uM> zpFB^n?GxD)Mh4yzXYcuw`Iuj~oYxoj*zdJZx`u1*ME&9PTeNMhB4dRTT*Vp@0qFit zFRHDKwHh^3kgV(xUps@Ap=+X??ecJf3449r4b~z;0jIJ?F4>r-Ct%A+Qmeia6=)qCDZLwv7m)Luk?@fLcR=4Xc=TETuR}p!Fo6#d#3S2*g zLApge$l}hPbn>kC2j5-XMl6{ABVKdnjjCFvskj4YW@K@R&inF!m2n^r)_RG4;Ajh^ z)TiIfXEbf>bWf*Dg70r%K1PjfBjfgvAm=X{*M2YHYc=&s?Skj zXL6?^lyL-v_5FQQWl89%ie(Re`t&1@u34ny2j{%MQL!a%+eIgxomV?xhMo`?F-EW( zahXqfs%QFs{3xC3;=BVJFamu1eg*JQ2V1C}9z_i{=Vhmz5lGN*Z$l8jAX@*~tHHsx zw627vkR&leb-vnmV`mrQlPQe9Os`Qs+|Zy05r@l{FGD0Rf1k?gd8&ZeOFzJTbp| zvxfcp_3OS^FeW(!9I~AM?0@)hF-k(msUDydyWwQ5H#!iL+5V~3-2_ngmjCS34*L?E=zNl!&4EF*;v`RW8 zr=}V>#(BPf{|tP2rboJ3k@?~7nsHc}0&LkK#J||BEejt2 zBQu55J-xk0Giz6V`yFvllfIxIEf;ZE4y|HG7Qb(u>Qh+mNUE0Qm%Ls@hHv`3LLQfi zLy#B-d(*2XUq4%kDCEY_ECmZoE<`xIIUq-54auyEy>Rp%Tq~|aNkv8f;jRAZmc20x z5s{Ap0XM*tDnQ`J?ROjw&b&(?In7VYyQ6y(deSO}Wsg*+r{rb{4HvGDAMcp)dynIC zJdQuU@aTAJON!&G_najc8X_{#(yp(q6}@i$fK9wyaeP#(`r<)X_M5ubZ{E0UumSKL zD-5g8xpVz`6{86|j1_VZnm!e0H8eE9v72LfY>rVFZ75E!jp?$;V^cGci4_^(gGhl`EyC&SrMsDMcV@qFQXJH|hNs zQsX;ZgAKs22`r!m^|Ac-sIOn5>U^lkFgVJ+UqrAkfevF|zT5z*2ncSuPFp@($6m1IXq@$fV7G2E)`sR5^R<(|jiZsk#uXm)M;T-bCVj7#R}e zP39~WS`ew7Tv-SL@N%w58$=EH8OSQWN@9;#SkQm}voc7fa9T@K(^Q4*PA73dbK;~I z;XtAoAp%pGT4kST$Yz6-`2njvwcWdCT!vRKRuhbc422f&xqPKd#Mbkt1y{Y8o|tyH zsdKCc>s~N9&}k@?+pVD7#!yO*?B3Z`;be4xUro@?x^*D03AB8jvqv_0I%@!T;iQY22CI6;6_S?+YHV^y3y_0Nmdti<)NyFayMlE=isQ38& z>KztOJT6NX+Jb#Ju`s$aVf?>B76(;=!)U>QXIPp)+Aok59=?6lojY~z#Ds;vI7;q5 z9N|VulJrdJq%?UYe7K0Rxtgq|a_hZu*A~g3)5MMV^MAkk|@>WdW$->(iM#76<2P|kvf?`h@t+)2pIMB^;Exn_ zV8*i0O79gX;Q~)*lI8Lcn)s8fd8HLiTT9XI9~&zwfBAv=@JH|E6qE1~G;Z{6oj;#$ z8#7$^(PcGHfX6$`tt6>-5)%4d9B(utV>Tx&s2!g8P*F}^iT)<)GM<+V4NocM*psbj z3HnnmfS|5UX8l|8_m=_)PDE44SVSc;wKAY5%AL~MA}1fH^jQcMW8sA-N=4-r5`*79 znjkDNG)|#vTR_QpB5u>Ah18VJ0^?-9@bArBsf-N;a*9~wX;e?#gcwSlTG9toj8O|i z$E}q^6rNarB@ptt%;D3G?fUs-G;D7)Z!kQXKkrE&$e1A~R2bI~ zn0YGW^G54fa&Fc}12%r;#~ADUqXpbd+g9L#_M^l)5`}T>U%4a{Snm2kGE{=8%3uP`1)BSFJ#TOg<>%z%AXCSrA zq-XNeK@AB)TJ^=S;=_r|>@Hd=C&<~`5~*IK9<*BjTucEL!0g{E%$*e-MXJ=lM5iz| zE64>l+-#EU)3_XKY0mDj`}}FF57ML$w_>QLFQo8<3{sg#*Z4RlcjI8cyLw?53q9iU z(d{JZSm`Fx(FYZGdyJ4y``G%#J^%SWON97r#h=7=(|7eRy6|Tk0hD|QKqnEc!t})DMb%l9I=n-8WP3--- zkVSgbQLSpVD4=#?Z+;KT6B1|{9_Mc>`O_Yh7XPS}Q8kD-gH)gmG$u_9PS=9g&f!V^1KXbt-v7aAkE^>FV3*rw$!mzPp<{HMz(;~vM# zgb!#r4oz+so0fa$^zC*ngL7tP%y#+Lf0C$G#SMDjNwBbfz>M+LyI;)Qd>t||vlA1K zfKnqVDLJ$adC5CmmfzRvA!V+gr$R2^?0f`?Ej1(((lvB$=s?u&q_0u+>({R#m(Z_+ zVJgd+Yh+;8$@j%fR5mi{ds>##iR@I;rEZp)KwN@!BAOec-5RNve(zvsMFTP|vXZ<#TM%aZ%AvpFZXA#dpm^%FD_QuyLG__jLpiNC4OZ>*z)L z(Vj5GL9SfBytcEW!+)j{8zRRML{{28%0)D$h zUxMP3P1BW^ydTeC7@HE)(9z{aHMXO+!otGx**p8Umxj{&Uf-vryn5^5%bHIF097FA zw9uCsgZC{ft6i5hLpis$rNyW}>s7BTHHNFU-`ZsP#JQ5`4Jy_ThFW0i*oQJIWT=C3 zB0Q#uzhf>f5^z4$Tzv@|Fsx?uOY_k{Fc6OP_4S>3XDZ*GyLRmwI?;vZ3;a3Qft+hs zTiXh7mzasXfsA8qW`@r3`uLQaj#o`J1cWdmQx2yiOF+{3TDb#;kHS6+QE=xC-0~KJ zl+W8O3+E=6?k4jbbLajJ({)cSDkM@}GdWfEGP#>yx{d2&&akq>=A&4j5&HrXuBw5i zlC@Y@+{-5^C1*({-E#JCQq+c!;N&PjYJ9-RxD091e8#?>9`SJ22K%~tU@CC|i2}$1 z5YU7V7J0GFN_0d-L}cWhYu7q<2_T`WqZ&*}!^G6q(eVW$c#wS)6%*4^esE=Y(A>hp zz`($yp;~P1=lPBK$>K;$kDRkbDTw@f{x;VP=?Ek*k;=u;cg7ZTy-jh?LYI^;OcQ)7 zeTRGTZwGyWdyzV9_bQY9TaSK=c5a><#G&_gch#8D@wcO+qt=j$(T?isYU^*+kSJGC zYK!3~BO=ltnOj-WP*Jh3@v@xntjUjziOG~n21Xu0e8`p!zI^EB=7x8{*hnS4_as9y zt+31Dc&KzVYTFey8D9EA-tOb%^f9;YHON&Ad~UKL;_% z5x}JKqTnrlbP95fu)YUnP)4$pD$hh9D`lmqAW1}caX<0I~HTc^R+ z1K|mr4{tJl^~^pjatMFJy6um`SoP>uP*B@77VGH{m`<<=_dhEpo>%C2u9(tICMmD5 zfHm)Oht*<-eY;eJ4+$9J_Q=H^AyUWHpTG&BqM`~34BQ0TKwqCCCh#IkfLE5TpeiMG zx5<36;n}-)?|@vl1#FYO{e5j6onw^7@pZ(Dut!?0k(_VezD-C-fc4j_au%bo80%mg zpRE54r;7~i`4x_M&J_z{x+7?YhhH%K2_lagrr0kab0!3jY<$-pnTeO4kpaQ*tP$3G z@TLP1XL0{{Si!Zj_pgxuSGaPF4tN9*7GKL#&Ek+iO)}l1pqPBXpRVCZKG}&o#u3Sr z*3i-(NDz)%Am&f8v}ASI6+N?br+5>Gl&gyOuB+1(ocZ5=68J5(FCQwky*PZ3ha5Zj z#Ot(0d(TY@;@EQ6$lW3$h+&75;g_#vA-L}RR}0E^y%8FO!dIu;*yg70fqS^R6g(SedZh9pvYU{DpzW zIhA=${duX>X1q@`WYLXjUt{45VJiwAeJQCQb#>R+nac;EmknsGOswv>vHpNcuZ?9%nM;4Y!}F^r!&eJg z%!|35)2fH9oYE-Mpu1ngvC732%Hv3RY+c-1_Rhn16B83QBj{nb?Zz6Z-!m1z2hQ{wB~)%RR2|F- z4^J+ROFTXyELSo%3%yyNa?$PdftGkwd;ctTEFa%mft3BDzhCf?a9%4it8oa=c}?z? z{3f%$#*C0ro$}^MUtVfl(g^=HP$8bL6zJpAS-&RTr9loV0Ns44v6oDhd(0^x<@EGs z)dblhhs9eLTin-YPBeBq=;S7on8{uH^OgR!u4hNy65 zxr{>Q!%XD&3~E9|s6Fnd{g<3!e+5<|7RoN-V48%77+*+n{5`MkD}L{E)(3=fW-IpI zv9El?L+JMU!bUjQWp~`8+5?v#h{)*t@k0zCDhxQzHC*aR@)bqY zmm8@?M-g*94nG;US_{yHq9P+_TB9QT{2l^H6cS$jq(m&*pD9Ipr$2xGyt`{l?lxc; z&2BUZ5QFr_jmf4SLJY7t@y4D8yN2b(X#$w97C7%N#08zmdCEoX#S1maKI6TvbUg!% za1&kK_Ib-*kI1MfqiS~J;m2By_~gHUS||MZmKhMsvSTom?w79qSgvGO4h~{uz{h>D zy!A^K8mo*6$r%`Jc%7}wN;IVZ^5sioNku7wbjrKdn1a%ErVhF~3Ax%97}KVuUUXZosvxI zy}hVt<3<}l*|lqH;AmANcTQ)kn z)HT(ww~4fY)@NnKCS939JL=Lh-^{FxIWUwtQC`Pe<&Y2`&pM0i2ZU_M$Ii{wmjjRJ zcbm^M`wcoex{^kc-z|MY0J0l2bJB1A16A5UE<>}8jd~zdm%lhZF0HQaT(b)#_`%?f zjg34NBS547Zw$3KwaF1-Kv*#?CqwP_5S@`~SRh`11I~VTf1jQ2-kCd{Z~1BdrW)0z zrL`=Mr~&QYzkeTr8PEW<(BIS3J-gB({(!-4>PT$*^KI?yw6wKVLU4g-TJ!N5MnQhO zau@1|4H<|}i&VpB?lP7`leo6K(#De6j0S%wYB;e`Qc?n-08ZXb1pErj-5H>D5+Dr6 z%AM-{$gj9pu{*;q1L)UV8yihcO~kO9DkqNNf5)c!kp{CXxq|)-SYd_0_BF6FS zaG^j-hC+&5;?p<^b(h%>$Ioxyz8%?n31r?|NnW;h`|5{?iSla^bP{H2WuGv~VfQxR zRjDC>SganLJh)FTAjY3Wr7{0-ofNF^@0*Da&SG%9x8N?%+}&j}dR2~B)txTcon+7e z&ITMfu&Da9Qzk3qLR&<2bk+)l;}Ur!PmN3pwtG^dq?}e{J|_8vJlw-s55A=#^q2rD zWM#f-B5XntCezXdiYAyA&dnM0ITBThoB=lHmNJyp~sDyZT&i=Do2Pv5;Tzu27wuK4D-nx4ihuCR0}o+q&&jtf zlXP%|wxAHisw~gJp_;AO`t|?}Ue$22%~U5-J%|JhXg_lO&~AYt6=+Qm#Ux@QHy><5 z8xnAii(Ju}B_|hUN_J@`fDa(4c5up(Nbk80LIxj@ zm03+jFJ-Kf0#HCduXBX?3sb4+zX2>?kGYe|`9kNZILwAj59pzgr{s%ejyx4sm|&RV_$jmh`^DTc9TB3D}mmB;VpLgp)$Mp0-VZ;xP1Q1QmoZbH4dsI2)!vs#d?O!`oYYhJ6ZB8iAvXJ`39ms z{al%LDFJ?u$A3BkueiHY}IHclTz zPHY6QeH_Cgcax;snH|Nv%@cY$I4v?xnbjFf@zMK$%VtTW$8{${R%uq03K89fvs^y*OnnHx%JqP)DCetoCv}Y>^D2yHA4ma`T0R^@RZQ?>Xgv+n=1mjY%s>_+Fil2ko7xwx+UHy#(s;t7u{Lw2I6O?TUw&JAz0 zoFUqd!E){)dJYOyh(CIHMKHhl5Eu7GN=gcBZGgW&P-W9_(Xkr|9pq@K-3GAR4O=K5 zKFnJ4f}Oj&o7?WeL2isIO3`%RvUgaAb%4+fZ)Ww(7qUz=GBJHav$R+Gy*R|C`42D8^A!Z+ij^&M1yucwa(B%xPN;q88uC{NZmeB%=|*I^QamG*JT-ohlkhGtS3*`s`9lV8%_BeHG=KVZ1@*`9A0gNe zcx1r!n=z(&f#ia?yNvmqvu)f6FaWt9iSCiP6JPZ3??ecD{hgVb2DtDDx5 zJgH4t*EG$I@UOT-34)gwmNd|kE6GV*~|PrTG!`HhHMO5s(b^O_cyyq%I|DHca`#^B6h%mAb~*71HZaY66d(B5a+e>6OG&pIfpUX%lOBwmqg#D@Q0g9rNG zlR0m4OE?|531V6#jeZfUK-<&Rq(sWIza__i4az62cuU3b$9C%e8l#dGgz6gxR||^D zm?smPO5#=4;48S=by_`)imlPZKJmz$!oJhT8@Ij56)|6tK76#UL52n?C$8=p)~KA3 zN}?Y<HSV=vWcI%gVsYCAS^45j*M22NoTsvUN8uepT4i zu<+>rq&meSjC=1dq+rL;wrB@=dCg>p*n5+pk=u#9u1DKGi3m5mR)s&9he|S2p%O;- z(&fT%4bJd$J=KCcB!kF^@H9CZhBzvr^!;#j4*ZDgF~`~Aha$%u&+<0t$lX@;rr9<| zzSckgH^6QAV5j-RjT`cuVF*lFP0MZvu~T<9@gQ?Z2H zI`EsZeMjl0JeEC46~kRrj27l@(*&dS*D%e+Q7cW`Lbwmq;NcrLimd76L1>3HX2$1r zaYT+I69)4YFv&}ucr~1pjK+@8V+XM;ra}a1vujdLc^Fs5?ZCsXM` zrmZk~v7xQ^s6o}6)2A5C8Y#jl9Nh6_qz;DoRJy84?xVGcwHCSfq^rd8;rMF(5S#Ee zEk1&1PfB2TM-ELc)0iB_Z2yH*_9uKDMS#E@pI=(5;@*FI4!sOO*8L&b`OE#2JIRmQ zFv&lU9Bl1rvY{J=6;i@|yP&qzHaaCkJ=sEvhbOP?jif(}9l!%%ret3Oe`h?z^S9(n zW2#Go#RTZ54kclR)Iv%F1FLrgR8l?d`p;cMO(x%iTNO8Eg4tfc zNfiV{yeG&`m%?>~#BrC@%07kOEm=&Ud*bm~nDE^=D23Rqmf?bPe&} zaDizx$)7o--ALUPFTwGJ(bt2e+tn-RScfaWIB~WTJ$QwDqFc8@BO+4!P?@(B)YRxj z{4+B%|37IOa2q5ai0|o5IHj=Ex#1=Hlzj^}x{C)5J?L<>$%gdC|Fnb8@$wP_dOFle zi$Cas)Cq!fbXAZ11@Em%34&$9sRC&i#s)m5OqEhq$axfrUhN zepGMMRsA7(m7msDqN=y^dT8dpTjI##daNm=7jWr&rKWK`3(3>rl3o1ns^m97%JlfE zwVye*ryEAmGTD5?ct2x)_ub25Z5nN2>NhqVIDL<7*GuJEo=d4^_=}xc7YsX5Yi`}S zULi`61r4f537uo}3GHkp$hVN%{ywww3ebhQc@Ew^8k%INkpA`!yzC}_MPgztOTJ1v zq>qTaKX`Fz0-38pbA6ISslq8Bf0KgTjp7mQOFJKS+LZkKU=-h3hybb5-%KQYm~igJ z{H^l=4O2(MT}WdJNE5I}v{>{H4ia6vX4VA8mJ|v?*Y2p7*|R}KA+OJ;Px>Y%71M{n zaoTo?fkHUa-{})R7MaF-S~Ey*?>@x@y@*UK@_pw`&57bM(<0&BE+p+_t%P^#Yo5D@73AsWugm}USYpxQ7gA>lH)L^KcQo7&Gu z(=-$mZ{TExj*gC{r6s`N8e{7?!)6Kd8)q4#2@(CvlB~=8Pw&F28l|u(QhRpj4@J(m zpp;>@ecBwm)Uj(|Ah|<#f6Ny}%*M{1pO**uMpJ2z&jW@441K)3Hhc><3iyxrW=x#N zO9yB|C}&F(WHpdK?|)D=GD=s^oV}#?QuI=2Co^VCOH1|ZF;C%R+Iz-3B9F)gbakl} zuZ=21mx->K#o|>ZkZ~~DWI9j$K^xHDM%S;sbYm%qU@Q;(rdtRWfcLPF+2qncW0#qF zG{RR`u2$6Z0Jk%R#d+&JCqFtK0Ne22RyxO%%F`N6Zg|_fZXP&?Z=b;5KZHhNK0J%0 z_==@=>g0 zNwMSI2)rt7+wR>V-hU>(blC;h<>gqmx^`Y8+tasAy%OSKXs3ww3@A|z3GDBOUU*fc z2j{Ba;%Z&7egT$d*y%9ls&mFTqDMzk0t@cHV6%RUJBYsXie3GB2JC%lNr-IhFjZxt+2foV7ya)`vCV%rS8ph>1$f(&C* zG}kdF+O7eJP*==%{3v%sMea6Pe);2p3mM+`6U#LIajA5btc8tjHs;auX_xDW8QXQv zXFr8cmh1TEylcB3KOL&=b=bIz)mK&39qOxYR@_2+VDLh=$i1oC-b(8KEa-`(&#>QoD!3$Q_wwudtij2s_}rkmtWFJjZ0bMHF~Gg+^cwWhv+SDr#^`)` ztmPKnnIt}Z#N)aVjy)s5NFx z>k&fkWnp9oCj98#>WSR7=J^8{6|Ruq!IfZJZcl0V(3&x81108HVst5Itoc*^8)m) z?PRPAV@pKgDdKCsEojJg(uD*jz8kA{MR--rX6xNMKG)DH{k)RVdRPma zFoiFIlk@k6c@gDD9hR#nQ+w@w-nC(IEtH}d)v+w*c&D$|h`nuhne=~DdiYQ{VZQ>4 z+z^-FxOwR5U3d*zo%#43>xdE=fqx0ssL=4Foq; z6uX3U=te%$`sA=^`nWeu8Bh$u6KExkjiNThd+SY12mAXsnba4ZpC@&h{dDXr(#Q-T zB*8<}*9z1cI`ca^(*K?A(uvjaYB*U&F0}`2CDfKPF)?uvc`tjMJ%|2`Oag7BIh?I& zBMHa?aOQ%NQs_ZeULG4GBO~pn1NyzvbsbV!M>BcMS1gpoE+r99Kzw&gcMgQOxVU)4 zZbPv92jNT@oz}wzMPpU^U?|JE%fQgGp(`fl)uYdAIVS}h;vW5J{k?Bub-waP zw%;baQa^pYyM>DQeh=DPwI>F*5uGiXu&}&*w9g5ISLRz+AzbVf8WVHeNSA4gSyWvHuzV_J?c19g1kvxB|8 zii*kt3RU#|S?%lWA9;COW80G*Bi*mnE94&KL9Bkc?&fF7W6qF^<*CWZ4aNlqtVG`` z>myGbDA|Jmk<30^D`F_Ow=Nk$B2-jWDem1f<{WPfq60Dy+E#H99&EU|GiQV;d1=MZsJ2*k2({_lf*igy6)y%Xq{EeRLuAA80Jw&hZ z?PJNc^A|LZ`TBHOUohPYljW)m^jU%Y%GK5H1%sYHUsBW1m}MG9a#<~G^=VX0y8*KY zV*2(t1;!%_x08~RW~Zhw$>AiHJfOJ1!3pF7Fqd#n1B2vsQqtwyLx%Yf@j5EF{-J;- zNHn#_UTj7SD@LwY{W_xGdDqiy$XVtc+1%V5CKeXzcoO%e+lzS&S+4SmBkso^GQ8nf zlDnYO0Dt~Qa>}@J4F_^sA~@)R8WR&Agfv+VX^{GV`H~G4V9?0+_BN!zY#4rKS?yJL z(c3}Vo!4LGe%a;7s zul+tKbe{S{OtwrY5kgO|7L!Y-->8Z90$}9SZKiEstv2jLNrW)y6`R$Oqqh|B_h(Z_ zMKnRvNr3<8}X6FRJvs2h&-8P+($cxcM0l z+j#Ln*4Fkhp;bK`xdLZD5fO>vubM~C{s^iJhMX^qcJ=gV<*78> zp%_<1CUsr7bm_*06T~~Ly!hWl`fsEr8%qxQXhE+c)Bc;V{=3LrUHm;U-uL$O$g8RG zIBXjRi3$>}Dhdh;LNzcp7M2*CNA~$MCEjiVWZEFuHCkx)2DnIYwi#F^&9nU0zz>1r zpx_La0O}9tz?D8n|F5Nm7fM`u^zUQurowv7`|-mNj$Qfr?gE`cTGoJ}=GkM2mgNGC z2uOM98Wp3Ok4SiI{a*a8`+xSP?Dy^b{eMrY&lg|?HVtym&N9v4`xSWRgGu0l{?O3S zrOTIlPZALpo;+hlM0mKn+6Q~!p$@=fW`N-ts3D@i=fff3=<`XIx;MZja5I6!la?3U z9(=x%WvY1KeeT^|SB>+svUWY6SABxzUpjDR+TZ45OLYx!6De@_=48%4SC@KEud1$I z?laR#uuL`2y)|X}boJ?ISKmO(A<-p<{xOW9u z>;jK*0G4mSPIg$B*hye6Nr?FWdg|P{dx2YGfhSO1YS_PO{d#*~fB*~WRsrBdfY;JV zJ`d`F-5f4%?z@k#UAwktkIgC$V6JT24cre6JgVzFdpNKK09-lN!CH4L!(`g@>7dgx zfE7N_n)Q3XNzG)CFfy8c+7)=lUrzyWnDg%4yTI<48W%K!{Ok7x)m6}3?Ka8rT~fK# zT%g5!_wEJuq4Ks{&-GJmiP4*`;<++pRfyKssIzwsKq->vf%SXojZtfXHOBpCn@700l`}w=H0LM-%}g DXO: + current_round = fl_ctx.get_prop(AppConstants.CURRENT_ROUND) + if current_round == 0: + # First round, collect the information regarding n_feature and n_cluster + # Initialize the aggregated center and count to all zero + client_0 = list(self.collection.keys())[0] + self.n_cluster = self.collection[client_0]["center"].shape[0] + n_feature = self.collection[client_0]["center"].shape[1] + self.center = np.zeros([self.n_cluster, n_feature]) + self.count = np.zeros([self.n_cluster]) + # perform one round of KMeans over the submitted centers + # to be used as the original center points + # no count for this round + center_collect = [] + for _, record in self.collection.items(): + center_collect.append(record["center"]) + centers = np.concatenate(center_collect) + kmeans_center_initial = KMeans(n_clusters=self.n_cluster) + kmeans_center_initial.fit(centers) + self.center = kmeans_center_initial.cluster_centers_ + else: + # Mini-batch k-Means step to assemble the received centers + for center_idx in range(self.n_cluster): + centers_global_rescale = self.center[center_idx] * self.count[center_idx] + # Aggregate center, add new center to previous estimate, weighted by counts + for _, record in self.collection.items(): + centers_global_rescale += record["center"][center_idx] * record["count"][center_idx] + self.count[center_idx] += record["count"][center_idx] + # Rescale to compute mean of all points (old and new combined) + alpha = 1 / self.count[center_idx] + centers_global_rescale *= alpha + # Update the global center + self.center[center_idx] = centers_global_rescale + params = {"center": self.center} + dxo = DXO(data_kind=self.expected_data_kind, data=params) + + return dxo diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py new file mode 100644 index 0000000000..61c96a5abe --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py @@ -0,0 +1,116 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Tuple + +from sklearn.cluster import KMeans, MiniBatchKMeans, kmeans_plusplus +from sklearn.metrics import homogeneity_score + +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.abstract.learner_spec import Learner +from nvflare.app_opt.sklearn.data_loader import load_data_for_range + + +class KMeansLearner(Learner): + def __init__( + self, + data_path: str, + train_start: int, + train_end: int, + valid_start: int, + valid_end: int, + random_state: int = None, + max_iter: int = 1, + n_init: int = 1, + reassignment_ratio: int = 0, + ): + super().__init__() + self.data_path = data_path + self.train_start = train_start + self.train_end = train_end + self.valid_start = valid_start + self.valid_end = valid_end + + self.random_state = random_state + self.max_iter = max_iter + self.n_init = n_init + self.reassignment_ratio = reassignment_ratio + self.train_data = None + self.valid_data = None + self.n_samples = None + self.n_clusters = None + + def load_data(self) -> dict: + train_data = load_data_for_range(self.data_path, self.train_start, self.train_end) + valid_data = load_data_for_range(self.data_path, self.valid_start, self.valid_end) + return {"train": train_data, "valid": valid_data} + + def initialize(self, parts: dict, fl_ctx: FLContext): + data = self.load_data() + self.train_data = data["train"] + self.valid_data = data["valid"] + # train data size, to be used for setting + # NUM_STEPS_CURRENT_ROUND for potential use in aggregation + self.n_samples = data["train"][-1] + # note that the model needs to be created every round + # due to the available API for center initialization + + def train(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: + # get training data, note that clustering is unsupervised + # so only x_train will be used + (x_train, y_train, train_size) = self.train_data + if curr_round == 0: + # first round, compute initial center with kmeans++ method + # model will be None for this round + self.n_clusters = global_param["n_clusters"] + center_local, _ = kmeans_plusplus(x_train, n_clusters=self.n_clusters, random_state=self.random_state) + kmeans = None + params = {"center": center_local, "count": None} + else: + center_global = global_param["center"] + # following rounds, local training starting from global center + kmeans = MiniBatchKMeans( + n_clusters=self.n_clusters, + batch_size=self.n_samples, + max_iter=self.max_iter, + init=center_global, + n_init=self.n_init, + reassignment_ratio=self.reassignment_ratio, + random_state=self.random_state, + ) + kmeans.fit(x_train) + center_local = kmeans.cluster_centers_ + count_local = kmeans._counts + params = {"center": center_local, "count": count_local} + return params, kmeans + + def validate(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: + # local validation with global center + # fit a standalone KMeans with just the given center + center_global = global_param["center"] + kmeans_global = KMeans(n_clusters=self.n_clusters, init=center_global, n_init=1) + kmeans_global.fit(center_global) + # get validation data, both x and y will be used + (x_valid, y_valid, valid_size) = self.valid_data + y_pred = kmeans_global.predict(x_valid) + homo = homogeneity_score(y_valid, y_pred) + self.log_info(fl_ctx, f"Homogeneity {homo:.4f}") + metrics = {"Homogeneity": homo} + return metrics, kmeans_global + + def finalize(self, fl_ctx: FLContext) -> None: + # freeing resources in finalize + del self.train_data + del self.valid_data + self.log_info(fl_ctx, "Freed training resources") diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json new file mode 100644 index 0000000000..b3bb0520a2 --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json @@ -0,0 +1,9 @@ +{ + "name": "sklearn_kmeans", + "resource_spec": {}, + "deploy_map": { + "app": [ + "@ALL" + ] + } +} diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py new file mode 100644 index 0000000000..37e4f4c6ce --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py @@ -0,0 +1,52 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nvflare.client.config import ExchangeFormat +from src.newton_raphson_workflow import FedAvgNewtonRaphson +from src.newton_raphson_persistor import NewtonRaphsonModelPersistor + +from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob +from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor +from nvflare.app_opt.sklearn.sklearn_executor import SKLearnExecutor +from nvflare.job_config.script_runner import ScriptRunner + +if __name__ == "__main__": + n_clients = 4 + num_rounds = 5 + + job = BaseFedJob( + name="logistic_regression_fedavg", + model_persistor=JoblibModelParamPersistor(n_clusters=3), + ) + + controller = FedAvgNewtonRaphson( + num_clients=n_clients, + num_rounds=num_rounds, + damping_factor=0.8, + persistor_id="newton_raphson_persistor", + ) + job.to(controller, "server") + + # Add clients + for i in range(n_clients): + runner = SKLearnExecutor( + script="src/newton_raphson_train.py", + script_args="--data_root /tmp/flare/dataset/heart_disease_data", + launch_external_process=True, + params_exchange_format=ExchangeFormat.RAW, + ) + job.to(runner, f"site-{i + 1}") + + job.export_job("/tmp/nvflare/jobs/job_config") + job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0", log_config="./log_config.json") diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt new file mode 100644 index 0000000000..a5414b480b --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt @@ -0,0 +1,5 @@ +nvflare~=2.5.0rc +pandas +scikit-learn +joblib +tensorboard diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py new file mode 100644 index 0000000000..23e6fdc62e --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_assembler.py @@ -0,0 +1,75 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict + +import numpy as np +from sklearn.cluster import KMeans + +from nvflare.apis.dxo import DXO, DataKind +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.aggregators.assembler import Assembler +from nvflare.app_common.app_constant import AppConstants + + +class KMeansAssembler(Assembler): + def __init__(self): + super().__init__(data_kind=DataKind.WEIGHTS) + # Aggregator needs to keep record of historical + # center and count information for mini-batch kmeans + self.center = None + self.count = None + self.n_cluster = 0 + + def get_model_params(self, dxo: DXO): + data = dxo.data + return {"center": data["center"], "count": data["count"]} + + def assemble(self, data: Dict[str, dict], fl_ctx: FLContext) -> DXO: + current_round = fl_ctx.get_prop(AppConstants.CURRENT_ROUND) + if current_round == 0: + # First round, collect the information regarding n_feature and n_cluster + # Initialize the aggregated center and count to all zero + client_0 = list(self.collection.keys())[0] + self.n_cluster = self.collection[client_0]["center"].shape[0] + n_feature = self.collection[client_0]["center"].shape[1] + self.center = np.zeros([self.n_cluster, n_feature]) + self.count = np.zeros([self.n_cluster]) + # perform one round of KMeans over the submitted centers + # to be used as the original center points + # no count for this round + center_collect = [] + for _, record in self.collection.items(): + center_collect.append(record["center"]) + centers = np.concatenate(center_collect) + kmeans_center_initial = KMeans(n_clusters=self.n_cluster) + kmeans_center_initial.fit(centers) + self.center = kmeans_center_initial.cluster_centers_ + else: + # Mini-batch k-Means step to assemble the received centers + for center_idx in range(self.n_cluster): + centers_global_rescale = self.center[center_idx] * self.count[center_idx] + # Aggregate center, add new center to previous estimate, weighted by counts + for _, record in self.collection.items(): + centers_global_rescale += record["center"][center_idx] * record["count"][center_idx] + self.count[center_idx] += record["count"][center_idx] + # Rescale to compute mean of all points (old and new combined) + alpha = 1 / self.count[center_idx] + centers_global_rescale *= alpha + # Update the global center + self.center[center_idx] = centers_global_rescale + params = {"center": self.center} + dxo = DXO(data_kind=self.expected_data_kind, data=params) + + return dxo diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py new file mode 100644 index 0000000000..61c96a5abe --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/src/kmeans_learner.py @@ -0,0 +1,116 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Tuple + +from sklearn.cluster import KMeans, MiniBatchKMeans, kmeans_plusplus +from sklearn.metrics import homogeneity_score + +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.abstract.learner_spec import Learner +from nvflare.app_opt.sklearn.data_loader import load_data_for_range + + +class KMeansLearner(Learner): + def __init__( + self, + data_path: str, + train_start: int, + train_end: int, + valid_start: int, + valid_end: int, + random_state: int = None, + max_iter: int = 1, + n_init: int = 1, + reassignment_ratio: int = 0, + ): + super().__init__() + self.data_path = data_path + self.train_start = train_start + self.train_end = train_end + self.valid_start = valid_start + self.valid_end = valid_end + + self.random_state = random_state + self.max_iter = max_iter + self.n_init = n_init + self.reassignment_ratio = reassignment_ratio + self.train_data = None + self.valid_data = None + self.n_samples = None + self.n_clusters = None + + def load_data(self) -> dict: + train_data = load_data_for_range(self.data_path, self.train_start, self.train_end) + valid_data = load_data_for_range(self.data_path, self.valid_start, self.valid_end) + return {"train": train_data, "valid": valid_data} + + def initialize(self, parts: dict, fl_ctx: FLContext): + data = self.load_data() + self.train_data = data["train"] + self.valid_data = data["valid"] + # train data size, to be used for setting + # NUM_STEPS_CURRENT_ROUND for potential use in aggregation + self.n_samples = data["train"][-1] + # note that the model needs to be created every round + # due to the available API for center initialization + + def train(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: + # get training data, note that clustering is unsupervised + # so only x_train will be used + (x_train, y_train, train_size) = self.train_data + if curr_round == 0: + # first round, compute initial center with kmeans++ method + # model will be None for this round + self.n_clusters = global_param["n_clusters"] + center_local, _ = kmeans_plusplus(x_train, n_clusters=self.n_clusters, random_state=self.random_state) + kmeans = None + params = {"center": center_local, "count": None} + else: + center_global = global_param["center"] + # following rounds, local training starting from global center + kmeans = MiniBatchKMeans( + n_clusters=self.n_clusters, + batch_size=self.n_samples, + max_iter=self.max_iter, + init=center_global, + n_init=self.n_init, + reassignment_ratio=self.reassignment_ratio, + random_state=self.random_state, + ) + kmeans.fit(x_train) + center_local = kmeans.cluster_centers_ + count_local = kmeans._counts + params = {"center": center_local, "count": count_local} + return params, kmeans + + def validate(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: + # local validation with global center + # fit a standalone KMeans with just the given center + center_global = global_param["center"] + kmeans_global = KMeans(n_clusters=self.n_clusters, init=center_global, n_init=1) + kmeans_global.fit(center_global) + # get validation data, both x and y will be used + (x_valid, y_valid, valid_size) = self.valid_data + y_pred = kmeans_global.predict(x_valid) + homo = homogeneity_score(y_valid, y_pred) + self.log_info(fl_ctx, f"Homogeneity {homo:.4f}") + metrics = {"Homogeneity": homo} + return metrics, kmeans_global + + def finalize(self, fl_ctx: FLContext) -> None: + # freeing resources in finalize + del self.train_data + del self.valid_data + self.log_info(fl_ctx, "Freed training resources") diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py new file mode 100644 index 0000000000..cfc12462d1 --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_data.py @@ -0,0 +1,84 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +from typing import Optional + +import numpy as np +import pandas as pd +from sklearn import datasets + + +def load_data(dataset_name: str = "iris"): + if dataset_name == "iris": + dataset = datasets.load_iris() + elif dataset_name == "cancer": + dataset = datasets.load_breast_cancer() + else: + raise ValueError("Dataset unknown!") + return dataset + + +def prepare_data( + output_dir: str, + dataset_name: str = "iris", + randomize: bool = False, + filename: Optional[str] = None, + file_format="csv", +): + # Load data + dataset = load_data(dataset_name) + x = dataset.data + y = dataset.target + if randomize: + np.random.seed(0) + idx_random = np.random.permutation(len(y)) + x = x[idx_random, :] + y = y[idx_random] + + data = np.column_stack((y, x)) + df = pd.DataFrame(data=data) + + # Check if the target folder exists, + # If not, create + + if os.path.exists(output_dir) and not os.path.isdir(output_dir): + os.rmdir(output_dir) + os.makedirs(output_dir, exist_ok=True) + + # Save to csv file + filename = filename if filename else f"{dataset_name}.csv" + if file_format == "csv": + file_path = os.path.join(output_dir, filename) + + df.to_csv(file_path, sep=",", index=False, header=False) + else: + raise NotImplementedError + + +def main(): + parser = argparse.ArgumentParser(description="Load sklearn data and save to csv") + parser.add_argument("--dataset_name", type=str, choices=["iris", "cancer"], help="Dataset name") + parser.add_argument("--randomize", type=int, help="Whether to randomize data sequence") + parser.add_argument("--out_path", type=str, help="Path to output data file") + args = parser.parse_args() + + output_dir = os.path.dirname(args.out_path) + filename = os.path.basename(args.out_path) + prepare_data(output_dir, args.dataset_name, args.randomize, filename) + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py new file mode 100644 index 0000000000..0233499d6e --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py @@ -0,0 +1,293 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import pathlib +import shutil +from enum import Enum +from typing import List + +import numpy as np + +from nvflare.apis.fl_constant import JobConstants + +JOBS_ROOT = "jobs" + + +class SplitMethod(Enum): + UNIFORM = "uniform" + LINEAR = "linear" + SQUARE = "square" + EXPONENTIAL = "exponential" + + +def job_config_args_parser(): + parser = argparse.ArgumentParser(description="generate train configs with data split") + parser.add_argument("--task_name", type=str, help="Task name for the config") + parser.add_argument("--data_path", type=str, help="Path to data file") + parser.add_argument("--site_num", type=int, help="Total number of sites") + parser.add_argument("--site_name_prefix", type=str, default="site-", help="Site name prefix") + parser.add_argument( + "--data_size", + type=int, + default=0, + help="Total data size, use if specified, in order to use partial data" + "If not specified, use the full data size fetched from file.", + ) + parser.add_argument( + "--valid_frac", + type=float, + help="Validation fraction of the total size, N = round(total_size* valid_frac), " + "the first N to be treated as validation data. " + "special case valid_frac = 1, where all data will be used" + "in validation, e.g. for evaluating unsupervised clustering with known ground truth label.", + ) + parser.add_argument( + "--split_method", + type=str, + default="uniform", + choices=["uniform", "linear", "square", "exponential"], + help="How to split the dataset", + ) + return parser + + +def get_split_ratios(site_num: int, split_method: SplitMethod): + if split_method == SplitMethod.UNIFORM: + ratio_vec = np.ones(site_num) + elif split_method == SplitMethod.LINEAR: + ratio_vec = np.linspace(1, site_num, num=site_num) + elif split_method == SplitMethod.SQUARE: + ratio_vec = np.square(np.linspace(1, site_num, num=site_num)) + elif split_method == SplitMethod.EXPONENTIAL: + ratio_vec = np.exp(np.linspace(1, site_num, num=site_num)) + else: + raise ValueError(f"Split method {split_method.name} not implemented!") + + return ratio_vec + + +def split_num_proportion(n, site_num, split_method: SplitMethod) -> List[int]: + split = [] + ratio_vec = get_split_ratios(site_num, split_method) + total = sum(ratio_vec) + left = n + for site in range(site_num - 1): + x = int(n * ratio_vec[site] / total) + left = left - x + split.append(x) + split.append(left) + return split + + +def assign_data_index_to_sites( + data_size: int, + valid_fraction: float, + num_sites: int, + site_name_prefix: str, + split_method: SplitMethod = SplitMethod.UNIFORM, +) -> dict: + if valid_fraction > 1.0: + raise ValueError("validation percent should be less than or equal to 100% of the total data") + elif valid_fraction < 1.0: + valid_size = int(round(data_size * valid_fraction, 0)) + train_size = data_size - valid_size + else: + valid_size = data_size + train_size = data_size + + site_sizes = split_num_proportion(train_size, num_sites, split_method) + split_data_indices = { + "valid": {"start": 0, "end": valid_size}, + } + for site in range(num_sites): + site_id = site_name_prefix + str(site + 1) + if valid_fraction < 1.0: + idx_start = valid_size + sum(site_sizes[:site]) + idx_end = valid_size + sum(site_sizes[: site + 1]) + else: + idx_start = sum(site_sizes[:site]) + idx_end = sum(site_sizes[: site + 1]) + split_data_indices[site_id] = {"start": idx_start, "end": idx_end} + + return split_data_indices + + +def get_file_line_count(input_path: str) -> int: + count = 0 + with open(input_path, "r") as fp: + for i, _ in enumerate(fp): + count += 1 + return count + + +def split_data( + data_path: str, + site_num: int, + data_size: int, + valid_frac: float, + site_name_prefix: str = "site-", + split_method: SplitMethod = SplitMethod.UNIFORM, +): + size_total_file = get_file_line_count(data_path) + if data_size > 0: + if data_size > size_total_file: + raise ValueError("data_size should be less than or equal to the true data size") + else: + size_total = data_size + else: + size_total = size_total_file + site_indices = assign_data_index_to_sites(size_total, valid_frac, site_num, site_name_prefix, split_method) + return site_indices + + +def _read_json(filename): + if not os.path.isfile(filename): + raise ValueError(f"{filename} does not exist!") + with open(filename, "r") as f: + return json.load(f) + + +def _write_json(data, filename): + with open(filename, "w") as f: + json.dump(data, f, indent=2) + + +def _get_job_name(args) -> str: + return args.task_name + "_" + str(args.site_num) + "_" + args.split_method + + +def _gen_deploy_map(num_sites: int, site_name_prefix: str) -> dict: + deploy_map = {"app_server": ["server"]} + for i in range(1, num_sites + 1): + deploy_map[f"app_{site_name_prefix}{i}"] = [f"{site_name_prefix}{i}"] + return deploy_map + + +def _update_meta(meta: dict, args): + name = _get_job_name(args) + meta["name"] = name + meta["deploy_map"] = _gen_deploy_map(args.site_num, args.site_name_prefix) + meta["min_clients"] = args.site_num + + +def _update_client_config(config: dict, args, site_name: str, site_indices): + # update client config + # data path and training/validation row indices + config["components"][0]["args"]["data_path"] = args.data_path + config["components"][0]["args"]["train_start"] = site_indices[site_name]["start"] + config["components"][0]["args"]["train_end"] = site_indices[site_name]["end"] + config["components"][0]["args"]["valid_start"] = site_indices["valid"]["start"] + config["components"][0]["args"]["valid_end"] = site_indices["valid"]["end"] + + +def _update_server_config(config: dict, args): + config["min_clients"] = args.site_num + + +def _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name): + dst_path = dst_job_path / dst_app_name / "custom" + os.makedirs(dst_path, exist_ok=True) + src_path = src_job_path / src_app_name / "custom" + if os.path.isdir(src_path): + shutil.copytree(src_path, dst_path, dirs_exist_ok=True) + + +def create_server_app(src_job_path, src_app_name, dst_job_path, site_name, args): + dst_app_name = f"app_{site_name}" + server_config = _read_json(src_job_path / src_app_name / "config" / JobConstants.SERVER_JOB_CONFIG) + dst_config_path = dst_job_path / dst_app_name / "config" + + # make target config folders + if not os.path.exists(dst_config_path): + os.makedirs(dst_config_path) + + _update_server_config(server_config, args) + server_config_filename = dst_config_path / JobConstants.SERVER_JOB_CONFIG + _write_json(server_config, server_config_filename) + + # copy custom file + _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name) + + +def create_client_app(src_job_path, src_app_name, dst_job_path, site_name, site_indices, args): + dst_app_name = f"app_{site_name}" + client_config = _read_json(src_job_path / src_app_name / "config" / JobConstants.CLIENT_JOB_CONFIG) + dst_config_path = dst_job_path / dst_app_name / "config" + + # make target config folders + if not os.path.exists(dst_config_path): + os.makedirs(dst_config_path) + + # adjust file contents according to each job's specs + _update_client_config(client_config, args, site_name, site_indices) + client_config_filename = dst_config_path / JobConstants.CLIENT_JOB_CONFIG + _write_json(client_config, client_config_filename) + + # copy custom file + _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name) + + +def main(): + parser = job_config_args_parser() + args = parser.parse_args() + job_name = _get_job_name(args) + src_name = args.task_name + "_base" + src_job_path = pathlib.Path(JOBS_ROOT) / src_name + + # create a new job + dst_job_path = pathlib.Path(JOBS_ROOT) / job_name + if not os.path.exists(dst_job_path): + os.makedirs(dst_job_path) + + # update meta + meta_config_dst = dst_job_path / JobConstants.META_FILE + meta_config = _read_json(src_job_path / JobConstants.META_FILE) + _update_meta(meta_config, args) + _write_json(meta_config, meta_config_dst) + + # create server side app + create_server_app( + src_job_path=src_job_path, + src_app_name="app", + dst_job_path=dst_job_path, + site_name="server", + args=args, + ) + + # generate data split + site_indices = split_data( + args.data_path, + args.site_num, + args.data_size, + args.valid_frac, + args.site_name_prefix, + ) + + # create client side app + for i in range(1, args.site_num + 1): + create_client_app( + src_job_path=src_job_path, + src_app_name="app", + dst_job_path=dst_job_path, + site_name=f"{args.site_name_prefix}{i}", + site_indices=site_indices, + args=args, + ) + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb index 930a53239b..094863c76e 100644 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb @@ -1,19 +1,204 @@ { "cells": [ + { + "cell_type": "markdown", + "id": "7d7767c9", + "metadata": {}, + "source": [ + "# Federated K-Means Clustering with Scikit-learn on Iris Dataset" + ] + }, + { + "cell_type": "markdown", + "id": "f635ea04", + "metadata": {}, + "source": [ + "## Introduction to Scikit-learn, tabular data, and federated k-Means\n", + "### Scikit-learn\n", + "This example shows how to use [NVIDIA FLARE](https://nvflare.readthedocs.io/en/main/index.html) on tabular data.\n", + "It uses [Scikit-learn](https://scikit-learn.org/),\n", + "a widely used open-source machine learning library that supports supervised \n", + "and unsupervised learning.\n", + "### Tabular data\n", + "The data used in this example is tabular in a format that can be handled by [pandas](https://pandas.pydata.org/), such that:\n", + "- rows correspond to data samples\n", + "- the first column represents the label \n", + "- the other columns cover the features. \n", + "\n", + "Each client is expected to have one local data file containing both training \n", + "and validation samples. To load the data for each client, the following \n", + "parameters are expected by the local learner:\n", + "- data_file_path: string, the full path to the client's data file \n", + "- train_start: int, start row index for the training set\n", + "- train_end: int, end row index for the training set\n", + "- valid_start: int, start row index for the validation set\n", + "- valid_end: int, end row index for the validation set\n", + "\n", + "### Federated k-Means clustering\n", + "The machine learning algorithm in this example is [k-Means clustering](https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html).\n", + "\n", + "The aggregation follows the scheme defined in [Mini-batch k-Means](https://scikit-learn.org/stable/modules/generated/sklearn.cluster.MiniBatchKMeans.html). \n", + "\n", + "Under this setting, each round of federated learning can be formulated as follows:\n", + "- local training: starting from global centers, each client trains a local MiniBatchKMeans model with their own data\n", + "- global aggregation: server collects the cluster center, \n", + " counts information from all clients, aggregates them by considering \n", + " each client's results as a mini-batch, and updates the global center and per-center counts.\n", + "\n", + "For center initialization, at the first round, each client generates its initial centers with the k-means++ method. Then, the server collects all initial centers and performs one round of k-means to generate the initial global center.\n", + "\n", + "Below we listed steps to run this example." + ] + }, + { + "cell_type": "markdown", + "id": "ce92018e", + "metadata": {}, + "source": [ + "## Install requirements\n", + "First, install the required packages:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e08b25db", + "metadata": {}, + "outputs": [], + "source": [ + "% pip install -r code/requirements.txt" + ] + }, + { + "cell_type": "markdown", + "id": "31c22f7d", + "metadata": {}, + "source": [ + "## Download and prepare data\n", + "This example uses the Iris dataset available from Scikit-learn's dataset API. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e6c3b765", + "metadata": {}, + "outputs": [], + "source": [ + "%env DATASET_PATH=/tmp/nvflare/dataset/sklearn_iris.csv\n", + "! python3 ./code/utils/prepare_data.py --dataset_name iris --randomize 1 --out_path ${DATASET_PATH}" + ] + }, + { + "cell_type": "markdown", + "id": "6a1fefd8", + "metadata": {}, + "source": [ + "This will load the data, format it properly by removing the header, order \n", + "the label and feature columns, randomize the dataset, and save it to a CSV file with comma separation. \n", + "The default path is `/tmp/nvflare/dataset/sklearn_iris.csv`. \n", + "\n", + "Note that the dataset contains a label for each sample, which will not be \n", + "used for training since k-Means clustering is an unsupervised method. \n", + "The entire dataset with labels will be used for performance evaluation \n", + "based on [homogeneity_score](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.homogeneity_score.html)." + ] + }, + { + "cell_type": "markdown", + "id": "c980231e", + "metadata": {}, + "source": [ + "## Prepare clients' configs with proper data information \n", + "For real-world FL applications, the config JSON files are expected to be \n", + "specified by each client individually, according to their own local data path and splits for training and validation.\n", + "\n", + "In this simulated study, to efficiently generate the config files for a \n", + "study under a particular setting, we provide a script to automate the process.\n", + "\n", + "Note that manual copying and content modification can achieve the same.\n", + "\n", + "For an experiment with `K` clients, we split one dataset into `K+1` parts in a non-overlapping fashion: \n", + "`K` clients' training data and `1` common validation data.\n", + "\n", + "To simulate data imbalance among clients, we provided several options for client data splits by specifying how a client's data amount correlates with its ID number (from `1` to `K`):\n", + "- Uniform\n", + "- Linear\n", + "- Square\n", + "- Exponential\n", + "\n", + "These options can be used to simulate no data imbalance (uniform), moderate \n", + "data imbalance (linear), and high data imbalance (square for larger client \n", + "number, e.g. `K=20`, exponential for smaller client number, e.g. `K=5` as \n", + "it will be too aggressive for a larger number of clients)\n", + "\n", + "This step is performed by " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "79496f10", + "metadata": {}, + "outputs": [], + "source": [ + "%env DATASET_PATH=/tmp/nvflare/dataset/sklearn_iris.csv\n", + "! python3 ./utils/prepare_job_config.py --task_name \"sklearn_kmeans\" --data_path \"${DATASET_PATH}\" --site_num 3 --valid_frac 1 --split_method \"uniform\"" + ] + }, + { + "cell_type": "markdown", + "id": "b86d1d0a", + "metadata": {}, + "source": [ + "In this example, we experiment with 3 clients under a uniform data split." + ] + }, + { + "cell_type": "markdown", + "id": "cf161c43", + "metadata": {}, + "source": [ + "## Run simulated kmeans experiment\n", + "Now that we have the job configs ready, we run the experiment using the FL Simulator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a2a8f0ee", + "metadata": {}, + "outputs": [], + "source": [ + "! nvflare simulator ./code/jobs/sklearn_kmeans_3_uniform -w /tmp/nvflare/sklearn_kmeans_iris -n 3 -t 3" + ] + }, + { + "cell_type": "markdown", + "id": "fb48af70", + "metadata": {}, + "source": [ + "## Result visualization\n", + "Model accuracy is computed as the homogeneity score between the cluster formed and the ground truth label, which can be visualized in tensorboard." + ] + }, { "cell_type": "code", "execution_count": null, - "id": "94a3e985-bc57-4973-b43f-867cf94ced6c", + "id": "88d9f366", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "%load_ext tensorboard\n", + "%tensorboard --logdir /tmp/nvflare/sklearn_kmeans_iris" + ] } ], "metadata": { "kernelspec": { - "display_name": "nvflare_example", + "display_name": ".venv", "language": "python", - "name": "nvflare_example" + "name": "python3" }, "language_info": { "codemirror_mode": { From f3d50768ebcbbcd1b39e75c3de3cb1e5864ebbc6 Mon Sep 17 00:00:00 2001 From: Kevin Date: Thu, 13 Feb 2025 16:20:00 -0500 Subject: [PATCH 2/2] update after example updates --- .../sklearn-kmeans/sklearn_kmeans_iris.ipynb | 12 +- .../app/config/config_fed_client.json | 32 -- .../app/config/config_fed_server.json | 56 ---- .../app/custom/kmeans_assembler.py | 75 ----- .../app/custom/kmeans_learner.py | 116 ------- .../code/jobs/sklearn_kmeans_base/meta.json | 9 - .../code/kmeans_fl_job.py | 52 ---- .../code/kmeans_job.py | 240 ++++++++++++++ .../code/requirements.txt | 1 - .../code/utils/prepare_job_config.py | 293 ------------------ .../convert_kmeans_to_fl.ipynb | 60 +--- 11 files changed, 251 insertions(+), 695 deletions(-) delete mode 100755 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_client.json delete mode 100755 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_server.json delete mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_assembler.py delete mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py delete mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json delete mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py create mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py delete mode 100644 examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py diff --git a/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb b/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb index fe5383f993..4076ac52cd 100644 --- a/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb +++ b/examples/advanced/sklearn-kmeans/sklearn_kmeans_iris.ipynb @@ -103,7 +103,7 @@ "id": "bd0713e2-e393-41c0-9da0-392535cf8a54", "metadata": {}, "source": [ - "## 4. Run simulated kmeans experiment\n", + "## 3. Run simulated kmeans experiment\n", "We run the federated training using NVFlare Simulator via [JobAPI](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html):" ] }, @@ -124,7 +124,7 @@ "id": "913e9ee2-e993-442d-a525-d2baf92af539", "metadata": {}, "source": [ - "## 5. Result visualization\n", + "## 4. Result visualization\n", "Model accuracy is computed as the homogeneity score between the cluster formed and the ground truth label, which can be visualized in tensorboard." ] }, @@ -140,14 +140,6 @@ "%load_ext tensorboard\n", "%tensorboard --logdir /tmp/nvflare/workspace/works/kmeans/sklearn_kmeans_uniform_3_clients" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bea9ebcd-96f5-45c8-a490-0559fab9991f", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_client.json b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_client.json deleted file mode 100755 index 6d3f382617..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_client.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "format_version": 2, - - "executors": [ - { - "tasks": ["train"], - "executor": { - "id": "Executor", - "path": "nvflare.app_opt.sklearn.sklearn_executor.SKLearnExecutor", - "args": { - "learner_id": "kmeans_learner" - } - } - } - ], - "task_result_filters": [], - "task_data_filters": [], - "components": [ - { - "id": "kmeans_learner", - "path": "kmeans_learner.KMeansLearner", - "args": { - "data_path": "/tmp/nvflare/dataset/sklearn_iris.csv", - "train_start": 0, - "train_end": 50, - "valid_start": 0, - "valid_end": 150, - "random_state": 0 - } - } - ] -} diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_server.json b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_server.json deleted file mode 100755 index c1c4ffacce..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/config/config_fed_server.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "format_version": 2, - "min_clients": 3, - "num_rounds": 5, - "server": { - "heart_beat_timeout": 600, - "task_request_interval": 0.05 - }, - "task_data_filters": [], - "task_result_filters": [], - "components": [ - { - "id": "persistor", - "path": "nvflare.app_opt.sklearn.joblib_model_param_persistor.JoblibModelParamPersistor", - "args": { - "initial_params": { - "n_clusters": 3 - } - } - }, - { - "id": "shareable_generator", - "path": "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator", - "args": {} - }, - { - "id": "aggregator", - "path": "nvflare.app_common.aggregators.collect_and_assemble_aggregator.CollectAndAssembleAggregator", - "args": { - "assembler_id" : "kmeans_assembler" - } - }, - { - "id": "kmeans_assembler", - "path": "kmeans_assembler.KMeansAssembler", - "args": {} - } - ], - "workflows": [ - { - "id": "scatter_and_gather", - "path": "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather", - "args": { - "min_clients" : "{min_clients}", - "num_rounds" : "{num_rounds}", - "start_round": 0, - "wait_time_after_min_received": 0, - "aggregator_id": "aggregator", - "persistor_id": "persistor", - "shareable_generator_id": "shareable_generator", - "train_task_name": "train", - "train_timeout": 0 - } - } - ] -} diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_assembler.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_assembler.py deleted file mode 100644 index 23e6fdc62e..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_assembler.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Dict - -import numpy as np -from sklearn.cluster import KMeans - -from nvflare.apis.dxo import DXO, DataKind -from nvflare.apis.fl_context import FLContext -from nvflare.app_common.aggregators.assembler import Assembler -from nvflare.app_common.app_constant import AppConstants - - -class KMeansAssembler(Assembler): - def __init__(self): - super().__init__(data_kind=DataKind.WEIGHTS) - # Aggregator needs to keep record of historical - # center and count information for mini-batch kmeans - self.center = None - self.count = None - self.n_cluster = 0 - - def get_model_params(self, dxo: DXO): - data = dxo.data - return {"center": data["center"], "count": data["count"]} - - def assemble(self, data: Dict[str, dict], fl_ctx: FLContext) -> DXO: - current_round = fl_ctx.get_prop(AppConstants.CURRENT_ROUND) - if current_round == 0: - # First round, collect the information regarding n_feature and n_cluster - # Initialize the aggregated center and count to all zero - client_0 = list(self.collection.keys())[0] - self.n_cluster = self.collection[client_0]["center"].shape[0] - n_feature = self.collection[client_0]["center"].shape[1] - self.center = np.zeros([self.n_cluster, n_feature]) - self.count = np.zeros([self.n_cluster]) - # perform one round of KMeans over the submitted centers - # to be used as the original center points - # no count for this round - center_collect = [] - for _, record in self.collection.items(): - center_collect.append(record["center"]) - centers = np.concatenate(center_collect) - kmeans_center_initial = KMeans(n_clusters=self.n_cluster) - kmeans_center_initial.fit(centers) - self.center = kmeans_center_initial.cluster_centers_ - else: - # Mini-batch k-Means step to assemble the received centers - for center_idx in range(self.n_cluster): - centers_global_rescale = self.center[center_idx] * self.count[center_idx] - # Aggregate center, add new center to previous estimate, weighted by counts - for _, record in self.collection.items(): - centers_global_rescale += record["center"][center_idx] * record["count"][center_idx] - self.count[center_idx] += record["count"][center_idx] - # Rescale to compute mean of all points (old and new combined) - alpha = 1 / self.count[center_idx] - centers_global_rescale *= alpha - # Update the global center - self.center[center_idx] = centers_global_rescale - params = {"center": self.center} - dxo = DXO(data_kind=self.expected_data_kind, data=params) - - return dxo diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py deleted file mode 100644 index 61c96a5abe..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/app/custom/kmeans_learner.py +++ /dev/null @@ -1,116 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional, Tuple - -from sklearn.cluster import KMeans, MiniBatchKMeans, kmeans_plusplus -from sklearn.metrics import homogeneity_score - -from nvflare.apis.fl_context import FLContext -from nvflare.app_common.abstract.learner_spec import Learner -from nvflare.app_opt.sklearn.data_loader import load_data_for_range - - -class KMeansLearner(Learner): - def __init__( - self, - data_path: str, - train_start: int, - train_end: int, - valid_start: int, - valid_end: int, - random_state: int = None, - max_iter: int = 1, - n_init: int = 1, - reassignment_ratio: int = 0, - ): - super().__init__() - self.data_path = data_path - self.train_start = train_start - self.train_end = train_end - self.valid_start = valid_start - self.valid_end = valid_end - - self.random_state = random_state - self.max_iter = max_iter - self.n_init = n_init - self.reassignment_ratio = reassignment_ratio - self.train_data = None - self.valid_data = None - self.n_samples = None - self.n_clusters = None - - def load_data(self) -> dict: - train_data = load_data_for_range(self.data_path, self.train_start, self.train_end) - valid_data = load_data_for_range(self.data_path, self.valid_start, self.valid_end) - return {"train": train_data, "valid": valid_data} - - def initialize(self, parts: dict, fl_ctx: FLContext): - data = self.load_data() - self.train_data = data["train"] - self.valid_data = data["valid"] - # train data size, to be used for setting - # NUM_STEPS_CURRENT_ROUND for potential use in aggregation - self.n_samples = data["train"][-1] - # note that the model needs to be created every round - # due to the available API for center initialization - - def train(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: - # get training data, note that clustering is unsupervised - # so only x_train will be used - (x_train, y_train, train_size) = self.train_data - if curr_round == 0: - # first round, compute initial center with kmeans++ method - # model will be None for this round - self.n_clusters = global_param["n_clusters"] - center_local, _ = kmeans_plusplus(x_train, n_clusters=self.n_clusters, random_state=self.random_state) - kmeans = None - params = {"center": center_local, "count": None} - else: - center_global = global_param["center"] - # following rounds, local training starting from global center - kmeans = MiniBatchKMeans( - n_clusters=self.n_clusters, - batch_size=self.n_samples, - max_iter=self.max_iter, - init=center_global, - n_init=self.n_init, - reassignment_ratio=self.reassignment_ratio, - random_state=self.random_state, - ) - kmeans.fit(x_train) - center_local = kmeans.cluster_centers_ - count_local = kmeans._counts - params = {"center": center_local, "count": count_local} - return params, kmeans - - def validate(self, curr_round: int, global_param: Optional[dict], fl_ctx: FLContext) -> Tuple[dict, dict]: - # local validation with global center - # fit a standalone KMeans with just the given center - center_global = global_param["center"] - kmeans_global = KMeans(n_clusters=self.n_clusters, init=center_global, n_init=1) - kmeans_global.fit(center_global) - # get validation data, both x and y will be used - (x_valid, y_valid, valid_size) = self.valid_data - y_pred = kmeans_global.predict(x_valid) - homo = homogeneity_score(y_valid, y_pred) - self.log_info(fl_ctx, f"Homogeneity {homo:.4f}") - metrics = {"Homogeneity": homo} - return metrics, kmeans_global - - def finalize(self, fl_ctx: FLContext) -> None: - # freeing resources in finalize - del self.train_data - del self.valid_data - self.log_info(fl_ctx, "Freed training resources") diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json deleted file mode 100644 index b3bb0520a2..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/jobs/sklearn_kmeans_base/meta.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "name": "sklearn_kmeans", - "resource_spec": {}, - "deploy_map": { - "app": [ - "@ALL" - ] - } -} diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py deleted file mode 100644 index 37e4f4c6ce..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_fl_job.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from nvflare.client.config import ExchangeFormat -from src.newton_raphson_workflow import FedAvgNewtonRaphson -from src.newton_raphson_persistor import NewtonRaphsonModelPersistor - -from nvflare.app_opt.pt.job_config.base_fed_job import BaseFedJob -from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor -from nvflare.app_opt.sklearn.sklearn_executor import SKLearnExecutor -from nvflare.job_config.script_runner import ScriptRunner - -if __name__ == "__main__": - n_clients = 4 - num_rounds = 5 - - job = BaseFedJob( - name="logistic_regression_fedavg", - model_persistor=JoblibModelParamPersistor(n_clusters=3), - ) - - controller = FedAvgNewtonRaphson( - num_clients=n_clients, - num_rounds=num_rounds, - damping_factor=0.8, - persistor_id="newton_raphson_persistor", - ) - job.to(controller, "server") - - # Add clients - for i in range(n_clients): - runner = SKLearnExecutor( - script="src/newton_raphson_train.py", - script_args="--data_root /tmp/flare/dataset/heart_disease_data", - launch_external_process=True, - params_exchange_format=ExchangeFormat.RAW, - ) - job.to(runner, f"site-{i + 1}") - - job.export_job("/tmp/nvflare/jobs/job_config") - job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0", log_config="./log_config.json") diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py new file mode 100644 index 0000000000..be7a31795a --- /dev/null +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/kmeans_job.py @@ -0,0 +1,240 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import os +from enum import Enum +from typing import List + +import numpy as np +from src.kmeans_assembler import KMeansAssembler +from src.kmeans_learner import KMeansLearner + +from nvflare import FedJob +from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator +from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator +from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather +from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor +from nvflare.app_opt.sklearn.sklearn_executor import SKLearnExecutor + + +class SplitMethod(Enum): + UNIFORM = "uniform" + LINEAR = "linear" + SQUARE = "square" + EXPONENTIAL = "exponential" + + +def get_split_ratios(site_num: int, split_method: SplitMethod): + if split_method == SplitMethod.UNIFORM: + ratio_vec = np.ones(site_num) + elif split_method == SplitMethod.LINEAR: + ratio_vec = np.linspace(1, site_num, num=site_num) + elif split_method == SplitMethod.SQUARE: + ratio_vec = np.square(np.linspace(1, site_num, num=site_num)) + elif split_method == SplitMethod.EXPONENTIAL: + ratio_vec = np.exp(np.linspace(1, site_num, num=site_num)) + else: + raise ValueError(f"Split method {split_method.name} not implemented!") + + return ratio_vec + + +def split_num_proportion(n, site_num, split_method: SplitMethod) -> List[int]: + split = [] + ratio_vec = get_split_ratios(site_num, split_method) + total = sum(ratio_vec) + left = n + for site in range(site_num - 1): + x = int(n * ratio_vec[site] / total) + left = left - x + split.append(x) + split.append(left) + return split + + +def assign_data_index_to_sites( + data_size: int, + valid_fraction: float, + num_sites: int, + split_method: SplitMethod = SplitMethod.UNIFORM, +) -> dict: + if valid_fraction > 1.0: + raise ValueError("validation percent should be less than or equal to 100% of the total data") + elif valid_fraction < 1.0: + valid_size = int(round(data_size * valid_fraction, 0)) + train_size = data_size - valid_size + else: + valid_size = data_size + train_size = data_size + + site_sizes = split_num_proportion(train_size, num_sites, split_method) + split_data_indices = { + "valid": {"start": 0, "end": valid_size}, + } + for site in range(num_sites): + site_id = site + 1 + if valid_fraction < 1.0: + idx_start = valid_size + sum(site_sizes[:site]) + idx_end = valid_size + sum(site_sizes[: site + 1]) + else: + idx_start = sum(site_sizes[:site]) + idx_end = sum(site_sizes[: site + 1]) + split_data_indices[site_id] = {"start": idx_start, "end": idx_end} + + return split_data_indices + + +def get_file_line_count(input_path: str) -> int: + count = 0 + with open(input_path, "r") as fp: + for i, _ in enumerate(fp): + count += 1 + return count + + +def split_data( + data_path: str, + num_clients: int, + valid_frac: float, + split_method: SplitMethod = SplitMethod.UNIFORM, +): + size_total_file = get_file_line_count(data_path) + site_indices = assign_data_index_to_sites(size_total_file, valid_frac, num_clients, split_method) + return site_indices + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--workspace_dir", + type=str, + default="/tmp/nvflare/workspace/works/kmeans", + help="work directory, default to '/tmp/nvflare/workspace/works/kmeans'", + ) + parser.add_argument( + "--job_dir", + type=str, + default="/tmp/nvflare/workspace/jobs/kmeans", + help="directory for job export, default to '/tmp/nvflare/workspace/jobs/kmeans'", + ) + parser.add_argument( + "--data_path", + type=str, + default="/tmp/nvflare/dataset/sklearn_iris.csv", + help="work directory, default to '/tmp/nvflare/dataset/sklearn_iris.csv'", + ) + parser.add_argument( + "--num_clients", + type=int, + default=3, + help="number of clients to simulate, default to 3", + ) + parser.add_argument( + "--num_rounds", + type=int, + default=5, + help="number of rounds, default to 5", + ) + parser.add_argument( + "--split_mode", + type=str, + default="uniform", + choices=["uniform", "linear", "square", "exponential"], + help="how to split data among clients", + ) + parser.add_argument( + "--valid_frac", + type=float, + default=1, + help="fraction of data to use for validation, default to perform validation on all data", + ) + return parser.parse_args() + + +def main(): + args = define_parser() + # Get args + data_path = args.data_path + num_clients = args.num_clients + num_rounds = args.num_rounds + split_mode = args.split_mode + valid_frac = args.valid_frac + job_name = f"sklearn_kmeans_{split_mode}_{num_clients}_clients" + + # Set the output workspace and job directories + workspace_dir = os.path.join(args.workspace_dir, job_name) + job_dir = args.job_dir + + # Create the FedJob + job = FedJob(name=job_name, min_clients=num_clients) + + # Define the controller workflow and send to server + controller = ScatterAndGather( + min_clients=num_clients, + num_rounds=num_rounds, + aggregator_id="aggregator", + persistor_id="persistor", + shareable_generator_id="shareable_generator", + train_task_name="train", + ) + job.to_server(controller, id="scatter_and_gather") + + # Define other server components + assembler = KMeansAssembler() + job.to_server(assembler, id="kmeans_assembler") + aggregator = CollectAndAssembleAggregator(assembler_id="kmeans_assembler") + job.to_server(aggregator, id="aggregator") + shareable_generator = FullModelShareableGenerator() + job.to_server(shareable_generator, id="shareable_generator") + persistor = JoblibModelParamPersistor( + initial_params={"n_clusters": 3}, + ) + job.to_server(persistor, id="persistor") + + # Get the data split numbers and send to each client + # generate data split + site_indices = split_data( + data_path, + num_clients, + valid_frac, + SplitMethod(split_mode), + ) + + for i in range(1, num_clients + 1): + # Define the executor and send to clients + runner = SKLearnExecutor(learner_id="kmeans_learner") + job.to(runner, f"site-{i}", tasks=["train"]) + + learner = KMeansLearner( + data_path=data_path, + train_start=site_indices[i]["start"], + train_end=site_indices[i]["end"], + valid_start=site_indices["valid"]["start"], + valid_end=site_indices["valid"]["end"], + random_state=0, + ) + job.to(learner, f"site-{i}", id="kmeans_learner") + + # Export the job + print("job_dir=", job_dir) + job.export_job(job_dir) + + # Run the job + print("workspace_dir=", workspace_dir) + job.simulator_run(workspace_dir) + + +if __name__ == "__main__": + main() diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt index a5414b480b..b72d5c2798 100644 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/requirements.txt @@ -1,4 +1,3 @@ -nvflare~=2.5.0rc pandas scikit-learn joblib diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py deleted file mode 100644 index 0233499d6e..0000000000 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/code/utils/prepare_job_config.py +++ /dev/null @@ -1,293 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import argparse -import json -import os -import pathlib -import shutil -from enum import Enum -from typing import List - -import numpy as np - -from nvflare.apis.fl_constant import JobConstants - -JOBS_ROOT = "jobs" - - -class SplitMethod(Enum): - UNIFORM = "uniform" - LINEAR = "linear" - SQUARE = "square" - EXPONENTIAL = "exponential" - - -def job_config_args_parser(): - parser = argparse.ArgumentParser(description="generate train configs with data split") - parser.add_argument("--task_name", type=str, help="Task name for the config") - parser.add_argument("--data_path", type=str, help="Path to data file") - parser.add_argument("--site_num", type=int, help="Total number of sites") - parser.add_argument("--site_name_prefix", type=str, default="site-", help="Site name prefix") - parser.add_argument( - "--data_size", - type=int, - default=0, - help="Total data size, use if specified, in order to use partial data" - "If not specified, use the full data size fetched from file.", - ) - parser.add_argument( - "--valid_frac", - type=float, - help="Validation fraction of the total size, N = round(total_size* valid_frac), " - "the first N to be treated as validation data. " - "special case valid_frac = 1, where all data will be used" - "in validation, e.g. for evaluating unsupervised clustering with known ground truth label.", - ) - parser.add_argument( - "--split_method", - type=str, - default="uniform", - choices=["uniform", "linear", "square", "exponential"], - help="How to split the dataset", - ) - return parser - - -def get_split_ratios(site_num: int, split_method: SplitMethod): - if split_method == SplitMethod.UNIFORM: - ratio_vec = np.ones(site_num) - elif split_method == SplitMethod.LINEAR: - ratio_vec = np.linspace(1, site_num, num=site_num) - elif split_method == SplitMethod.SQUARE: - ratio_vec = np.square(np.linspace(1, site_num, num=site_num)) - elif split_method == SplitMethod.EXPONENTIAL: - ratio_vec = np.exp(np.linspace(1, site_num, num=site_num)) - else: - raise ValueError(f"Split method {split_method.name} not implemented!") - - return ratio_vec - - -def split_num_proportion(n, site_num, split_method: SplitMethod) -> List[int]: - split = [] - ratio_vec = get_split_ratios(site_num, split_method) - total = sum(ratio_vec) - left = n - for site in range(site_num - 1): - x = int(n * ratio_vec[site] / total) - left = left - x - split.append(x) - split.append(left) - return split - - -def assign_data_index_to_sites( - data_size: int, - valid_fraction: float, - num_sites: int, - site_name_prefix: str, - split_method: SplitMethod = SplitMethod.UNIFORM, -) -> dict: - if valid_fraction > 1.0: - raise ValueError("validation percent should be less than or equal to 100% of the total data") - elif valid_fraction < 1.0: - valid_size = int(round(data_size * valid_fraction, 0)) - train_size = data_size - valid_size - else: - valid_size = data_size - train_size = data_size - - site_sizes = split_num_proportion(train_size, num_sites, split_method) - split_data_indices = { - "valid": {"start": 0, "end": valid_size}, - } - for site in range(num_sites): - site_id = site_name_prefix + str(site + 1) - if valid_fraction < 1.0: - idx_start = valid_size + sum(site_sizes[:site]) - idx_end = valid_size + sum(site_sizes[: site + 1]) - else: - idx_start = sum(site_sizes[:site]) - idx_end = sum(site_sizes[: site + 1]) - split_data_indices[site_id] = {"start": idx_start, "end": idx_end} - - return split_data_indices - - -def get_file_line_count(input_path: str) -> int: - count = 0 - with open(input_path, "r") as fp: - for i, _ in enumerate(fp): - count += 1 - return count - - -def split_data( - data_path: str, - site_num: int, - data_size: int, - valid_frac: float, - site_name_prefix: str = "site-", - split_method: SplitMethod = SplitMethod.UNIFORM, -): - size_total_file = get_file_line_count(data_path) - if data_size > 0: - if data_size > size_total_file: - raise ValueError("data_size should be less than or equal to the true data size") - else: - size_total = data_size - else: - size_total = size_total_file - site_indices = assign_data_index_to_sites(size_total, valid_frac, site_num, site_name_prefix, split_method) - return site_indices - - -def _read_json(filename): - if not os.path.isfile(filename): - raise ValueError(f"{filename} does not exist!") - with open(filename, "r") as f: - return json.load(f) - - -def _write_json(data, filename): - with open(filename, "w") as f: - json.dump(data, f, indent=2) - - -def _get_job_name(args) -> str: - return args.task_name + "_" + str(args.site_num) + "_" + args.split_method - - -def _gen_deploy_map(num_sites: int, site_name_prefix: str) -> dict: - deploy_map = {"app_server": ["server"]} - for i in range(1, num_sites + 1): - deploy_map[f"app_{site_name_prefix}{i}"] = [f"{site_name_prefix}{i}"] - return deploy_map - - -def _update_meta(meta: dict, args): - name = _get_job_name(args) - meta["name"] = name - meta["deploy_map"] = _gen_deploy_map(args.site_num, args.site_name_prefix) - meta["min_clients"] = args.site_num - - -def _update_client_config(config: dict, args, site_name: str, site_indices): - # update client config - # data path and training/validation row indices - config["components"][0]["args"]["data_path"] = args.data_path - config["components"][0]["args"]["train_start"] = site_indices[site_name]["start"] - config["components"][0]["args"]["train_end"] = site_indices[site_name]["end"] - config["components"][0]["args"]["valid_start"] = site_indices["valid"]["start"] - config["components"][0]["args"]["valid_end"] = site_indices["valid"]["end"] - - -def _update_server_config(config: dict, args): - config["min_clients"] = args.site_num - - -def _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name): - dst_path = dst_job_path / dst_app_name / "custom" - os.makedirs(dst_path, exist_ok=True) - src_path = src_job_path / src_app_name / "custom" - if os.path.isdir(src_path): - shutil.copytree(src_path, dst_path, dirs_exist_ok=True) - - -def create_server_app(src_job_path, src_app_name, dst_job_path, site_name, args): - dst_app_name = f"app_{site_name}" - server_config = _read_json(src_job_path / src_app_name / "config" / JobConstants.SERVER_JOB_CONFIG) - dst_config_path = dst_job_path / dst_app_name / "config" - - # make target config folders - if not os.path.exists(dst_config_path): - os.makedirs(dst_config_path) - - _update_server_config(server_config, args) - server_config_filename = dst_config_path / JobConstants.SERVER_JOB_CONFIG - _write_json(server_config, server_config_filename) - - # copy custom file - _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name) - - -def create_client_app(src_job_path, src_app_name, dst_job_path, site_name, site_indices, args): - dst_app_name = f"app_{site_name}" - client_config = _read_json(src_job_path / src_app_name / "config" / JobConstants.CLIENT_JOB_CONFIG) - dst_config_path = dst_job_path / dst_app_name / "config" - - # make target config folders - if not os.path.exists(dst_config_path): - os.makedirs(dst_config_path) - - # adjust file contents according to each job's specs - _update_client_config(client_config, args, site_name, site_indices) - client_config_filename = dst_config_path / JobConstants.CLIENT_JOB_CONFIG - _write_json(client_config, client_config_filename) - - # copy custom file - _copy_custom_files(src_job_path, src_app_name, dst_job_path, dst_app_name) - - -def main(): - parser = job_config_args_parser() - args = parser.parse_args() - job_name = _get_job_name(args) - src_name = args.task_name + "_base" - src_job_path = pathlib.Path(JOBS_ROOT) / src_name - - # create a new job - dst_job_path = pathlib.Path(JOBS_ROOT) / job_name - if not os.path.exists(dst_job_path): - os.makedirs(dst_job_path) - - # update meta - meta_config_dst = dst_job_path / JobConstants.META_FILE - meta_config = _read_json(src_job_path / JobConstants.META_FILE) - _update_meta(meta_config, args) - _write_json(meta_config, meta_config_dst) - - # create server side app - create_server_app( - src_job_path=src_job_path, - src_app_name="app", - dst_job_path=dst_job_path, - site_name="server", - args=args, - ) - - # generate data split - site_indices = split_data( - args.data_path, - args.site_num, - args.data_size, - args.valid_frac, - args.site_name_prefix, - ) - - # create client side app - for i in range(1, args.site_num + 1): - create_client_app( - src_job_path=src_job_path, - src_app_name="app", - dst_job_path=dst_job_path, - site_name=f"{args.site_name_prefix}{i}", - site_indices=site_indices, - args=args, - ) - - -if __name__ == "__main__": - main() diff --git a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb index 094863c76e..f25d8d2af1 100644 --- a/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb +++ b/examples/tutorials/self-paced-training/part-1_federated_learning_introduction/chapter-2_develop_federated_learning_applications/02.3_convert_machine_learning_to_federated_learning/02.3.2_convert_kmeans_to_federated_learning/convert_kmeans_to_fl.ipynb @@ -86,7 +86,7 @@ "outputs": [], "source": [ "%env DATASET_PATH=/tmp/nvflare/dataset/sklearn_iris.csv\n", - "! python3 ./code/utils/prepare_data.py --dataset_name iris --randomize 1 --out_path ${DATASET_PATH}" + "! python3 ./code/utils/prepare_data.py --dataset_name iris --out_path ${DATASET_PATH}" ] }, { @@ -106,71 +106,29 @@ }, { "cell_type": "markdown", - "id": "c980231e", + "id": "cf161c43", "metadata": {}, "source": [ - "## Prepare clients' configs with proper data information \n", - "For real-world FL applications, the config JSON files are expected to be \n", - "specified by each client individually, according to their own local data path and splits for training and validation.\n", - "\n", - "In this simulated study, to efficiently generate the config files for a \n", - "study under a particular setting, we provide a script to automate the process.\n", - "\n", - "Note that manual copying and content modification can achieve the same.\n", - "\n", - "For an experiment with `K` clients, we split one dataset into `K+1` parts in a non-overlapping fashion: \n", - "`K` clients' training data and `1` common validation data.\n", - "\n", - "To simulate data imbalance among clients, we provided several options for client data splits by specifying how a client's data amount correlates with its ID number (from `1` to `K`):\n", - "- Uniform\n", - "- Linear\n", - "- Square\n", - "- Exponential\n", - "\n", - "These options can be used to simulate no data imbalance (uniform), moderate \n", - "data imbalance (linear), and high data imbalance (square for larger client \n", - "number, e.g. `K=20`, exponential for smaller client number, e.g. `K=5` as \n", - "it will be too aggressive for a larger number of clients)\n", - "\n", - "This step is performed by " + "## Run simulated kmeans experiment\n", + "We can run the federated training using the NVFlare Simulator with the JobAPI:" ] }, { "cell_type": "code", "execution_count": null, - "id": "79496f10", + "id": "a2a8f0ee", "metadata": {}, "outputs": [], "source": [ - "%env DATASET_PATH=/tmp/nvflare/dataset/sklearn_iris.csv\n", - "! python3 ./utils/prepare_job_config.py --task_name \"sklearn_kmeans\" --data_path \"${DATASET_PATH}\" --site_num 3 --valid_frac 1 --split_method \"uniform\"" + "! python kmeans_job.py --num_clients 3 --split_mode uniform" ] }, { "cell_type": "markdown", - "id": "b86d1d0a", + "id": "7b9fdb72", "metadata": {}, "source": [ - "In this example, we experiment with 3 clients under a uniform data split." - ] - }, - { - "cell_type": "markdown", - "id": "cf161c43", - "metadata": {}, - "source": [ - "## Run simulated kmeans experiment\n", - "Now that we have the job configs ready, we run the experiment using the FL Simulator." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a2a8f0ee", - "metadata": {}, - "outputs": [], - "source": [ - "! nvflare simulator ./code/jobs/sklearn_kmeans_3_uniform -w /tmp/nvflare/sklearn_kmeans_iris -n 3 -t 3" + "With the default arguments, [kmeans_job.py](code/kmeans_job.py) will export the job to `/tmp/nvflare/workspace/jobs/kmeans` and then the job will be run with a workspace directory of `/tmp/nvflare/workspace/works/kmeans`." ] }, { @@ -190,7 +148,7 @@ "outputs": [], "source": [ "%load_ext tensorboard\n", - "%tensorboard --logdir /tmp/nvflare/sklearn_kmeans_iris" + "%tensorboard --logdir /tmp/nvflare/workspace/works/kmeans/sklearn_kmeans_uniform_3_clients" ] } ],