5. Running FedLearn and Code Example

This section provides step-by-step instructions and code examples for running FedLearn in both local and collaborative modes.

5.1 Running FedLearn in Local Mode

Local mode is useful for testing and development purposes. It simulates a federated learning environment on a single machine.

Step 1: Prepare Your Data

Ensure your dataset is prepared and located in the directory specified in your config.json file.

Step 2: Configure FedLearn

Update your config.json file for local mode:

json
{
  "server_address": "localhost:8080",
  "num_rounds": 10,
  "strategy": "FedAvg",
  "grpc_max_message_length": 104857600,
  "mode": "local",
  "num_clients": 3,
  "local_epochs": 5,
  "batch_size": 32,
  "learning_rate": 0.01,
  "model_name": "simple_cnn",
  "dataset": "mnist",
  "data_dir": "./data",
  "output_dir": "./output"
}

Step 3: Run FedLearn

Create a Python script run_fedlearn_local.py:

python
from cifer import *
from imutils import paths
import numpy as np
import os
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense
from tensorflow.keras.optimizers import SGD
from keras.utils import to_categorical
import random

def load_mnist_byCid(cid):
    data = []
    labels = []

    path = f"data/standalone"
    img_paths = list(paths.list_images(path))

    for imgpath in img_paths:
        # Load the image in grayscale mode
        img_grayscale = cv2.imread(imgpath, cv2.IMREAD_GRAYSCALE)

        # Resize the image to 256x256; this can be adjusted based on user requirements
        img_resized = cv2.resize(img_grayscale, (256, 256))

        # Convert to 3D array and normalize
        img = np.expand_dims(img_resized, axis=-1) / 255.0

        # Extract the label from the image name according to the folder
        label = imgpath.split(os.path.sep)[-2]

        # Append the data to the list
        data.append(img)
        labels.append(label)

    return np.array(data), np.array(labels)

def define_model(input_shape, num_classes):
    model = Sequential()
    model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=input_shape))
    model.add(MaxPool2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(100, activation='relu', kernel_initializer='he_uniform'))
    model.add(Dense(num_classes, activation='softmax'))
  
    opt = SGD(learning_rate=0.01, momentum=0.9)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

def setWeightSingleList(weights):
    weights_flat = [w.flatten() for w in weights]
    weights = np.concatenate(weights_flat).tolist()
    return weights

def reshapeWeight(server_weight, client_weight):
    reshape_weight = []
    for layer_weights in client_weight:
        n_weights = np.prod(layer_weights.shape)
        reshape_weight.append(np.array(server_weight[:n_weights]).reshape(layer_weights.shape))
        server_weight = server_weight[n_weights:]
    return reshape_weight

def createRandomClientList(clients_dictionary, n_round_clients):
    keys = list(clients_dictionary.keys())
    return random.sample(keys, n_round_clients)

def train_model(cid, num_classes):
    data, labels = load_mnist_byCid(cid)
    labels = to_categorical(labels, num_classes=num_classes)

    input_shape = (256, 256, 1)  # Image size
    model = define_model(input_shape, num_classes)

    # Split data into training and testing sets
    num_samples = len(data)
    split = int(0.8 * num_samples)
    x_train, x_test = data[:split], data[split:]
    y_train, y_test = labels[:split], labels[split:]

    # Train the model
    model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=10, batch_size=32)
    
    # Evaluate the model
    loss, accuracy = model.evaluate(x_test, y_test)
    print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")

# Call the train_model function
if __name__ == "__main__":
    client_id = 0  # Specify the client to train
    num_classes = 10  # Number of classes in MNISTT
    train_model(client_id, num_classes)

Run the script:

bash
python run_fedlearn_local.py

5.2 Running FedLearn in Collaborative Mode

Collaborative mode is used for actual distributed federated learning across multiple devices or organizations.

Step 1: Server Setup

On the server machine, create a script run_server.py:

python
import json
import cifer.fedlearn as fedlearn
import cifer.server as server

def load_config(config_path='config.json'):
    """
    Load the configuration file.
    """
    try:
        with open(config_path, 'r') as config_file:
            config = json.load(config_file)
        return config
    except FileNotFoundError:
        print(f"Configuration file {config_path} not found.")
    except json.JSONDecodeError:
        print("Error decoding JSON from the configuration file.")
    except Exception as e:
        print(f"An error occurred: {e}")

def main():
    config = load_config()
    if config:
        # Initialize FedLearn object
        fl = fedlearn.FedLearn(config)
        
        # Start the FedLearn server
        fl.start_server()

if __name__ == "__main__":
    main()

Run the server:

bash
python run_server.py

Step 2: Client Setup

On each client machine, create a script run_client.py:

python
import models as models
import sys
import grpc
import fed_grpc_pb2_grpc
import fed_grpc_pb2
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
import os
import signal
from concurrent import futures
from cifer import * 

class FedClient(fed_grpc_pb2_grpc.FederatedServiceServicer):
    def __init__(self, cid, x_train, x_test, y_train, y_test, model, server_adress, client_ip):
        self.cid = cid
        self.x_train = x_train
        self.x_test = x_test
        self.y_train = y_train
        self.y_test = y_test
        self.model = model
        self.server_adress = server_adress
        self.client_ip = client_ip
    
    def __setClientChannel(self,client_channel):
        self.client_channel = client_channel
    
    def __waitingForServer(self,port):
        client_channel = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
        self.__setClientChannel(client_channel)
        fed_grpc_pb2_grpc.add_FederatedServiceServicer_to_server(self, client_channel)

        client_ip = self.client_ip + ':' + port

        client_channel.add_insecure_port(client_ip)
        client_channel.start()
        client_channel.wait_for_termination()
    
    def sendRound(self, request, context):
        ac_round = request.round
        print()
        print(f"Starting {ac_round} round")
        return fed_grpc_pb2.void()

    def startLearning(self, request, context):
        self.model.fit(self.x_train, self.y_train, epochs=1, verbose=2)

        weights_list = aux.setWeightSingleList(self.model.get_weights())
        
        return fed_grpc_pb2.weightList(weight = (weights_list))

    def getSampleSize(self, request, context):
        return fed_grpc_pb2.sampleSize(size = (len(self.x_train)))
        
    def modelValidation(self, request, context):
        server_weight = request.weight
        self.model.set_weights(aux.reshapeWeight(server_weight, self.model.get_weights()))
        accuracy = self.model.evaluate(self.x_test, self.y_test, verbose=0)[1]

        print(f"Local accuracy with global weights: {accuracy}")

        return fed_grpc_pb2.accuracy(acc = (accuracy))

    def killClient(self, request, context):
        print()
        print(f"Call for closing channel - Killing Client {self.cid}")
        self.client_channel.stop(0)

        return fed_grpc_pb2.void()
    
    def runClient(self):

        server_channel = grpc.insecure_channel(self.server_adress)
        client = fed_grpc_pb2_grpc.FederatedServiceStub(server_channel)

        port = self.server_adress.split(':')[1]
        port = str(30000 + int(self.cid))

        register_out = client.clientRegister(fed_grpc_pb2.registerArgs(ip=self.client_ip, port=port, cid=self.cid))

        if register_out.connectedClient:
            print(f"Client Connected at round {register_out.round}, waiting for server commands...")
            self.__waitingForServer(port)
        else:
            print("This client couldn't connect with the server")

if __name__ == '__main__':
    cid = -1
    input_shape = (28, 28, 1)
    num_classes = 10
    server_adress = 'localhost:8080'
    client_ip = '[::]'

    try:
        cid = sys.argv[1]
    except IndexError:
        print("Missing argument! You need to pass: ClientId")
        exit()

    x_train, y_train = aux.load_mnist_byCid(cid)
    x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.2, random_state=42)

    y_train = to_categorical(y_train, num_classes)
    y_test = to_categorical(y_test, num_classes)

    model = aux.define_model(input_shape,num_classes)

    fed_client = FedClient(cid, x_train, x_test, y_train, y_test, model, server_adress, client_ip)
    fed_client.runClient()

Run the client on each participating machine:

bash
python run_client.py

5.3 Example: Image Classification with MNIST

Here's a complete example using the MNIST dataset for image classification, adapted for lung cancer detection, and to be used when running FedLearn in Collaborative Mode (5.2):

python
from imutils import paths
import numpy as np
import os
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D,Flatten,Dense
from tensorflow.keras.optimizers import SGD
from keras.utils import to_categorical
import random
from cifer import * 

def load_mnist_byCid(cid):
    data = []
    labels = []

    path = f"data/client_{cid}"
    img_paths = list(paths.list_images(path))

    for imgpath in img_paths:
        img_grayscale = cv2.imread(imgpath, cv2.IMREAD_GRAYSCALE)
        img_resized = cv2.resize(img_grayscale, (28, 28))
        img = np.expand_dims(img_resized, axis=-1) / 255.0
        label = imgpath.split(os.path.sep)[-2]
        data.append(img)
        labels.append(label)
    return np.array(data), np.array(labels)

def define_model(input_shape,num_classes):
    model = Sequential()
    model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=input_shape))
    model.add(MaxPool2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(100, activation='relu', kernel_initializer='he_uniform'))
    model.add(Dense(num_classes, activation='softmax'))
  
    opt = SGD(learning_rate=0.01, momentum=0.9)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

def setWeightSingleList(weights):
    weights_flat = [w.flatten() for w in weights]    
    weights = np.concatenate(weights_flat).tolist()
    return weights

def reshapeWeight(server_weight, client_weight):
    reshape_weight = []   
    for layer_weights in client_weight:
        n_weights = np.prod(layer_weights.shape)
        reshape_weight.append(np.array(server_weight[:n_weights]).reshape(layer_weights.shape))
        server_weight = server_weight[n_weights:]
    return reshape_weight

def createRandomClientList(clients_dictionary, n_round_clients):
    keys = list(clients_dictionary.keys())
    return random.sample(keys, n_round_clients)

This example demonstrates how to use Cifer's FedLearn for a typical image classification task using the MNIST dataset. It includes data loading, model definition, federated learning execution, and final model evaluation and saving.

Last updated