5. Running FedLearn and Code Example
This section provides step-by-step instructions and code examples for running FedLearn in both local and collaborative modes.
5.1 Running FedLearn in Local Mode
Local mode is useful for testing and development purposes. It simulates a federated learning environment on a single machine.
Step 1: Prepare Your Data
Ensure your dataset is prepared and located in the directory specified in your config.json
file.
Step 2: Configure FedLearn
Update your config.json
file for local mode:
{
"server_address": "localhost:8080",
"num_rounds": 10,
"strategy": "FedAvg",
"grpc_max_message_length": 104857600,
"mode": "local",
"num_clients": 3,
"local_epochs": 5,
"batch_size": 32,
"learning_rate": 0.01,
"model_name": "simple_cnn",
"dataset": "mnist",
"data_dir": "./data",
"output_dir": "./output"
}
Step 3: Run FedLearn
Create a Python script run_fedlearn_local.py
:
from cifer import *
from imutils import paths
import numpy as np
import os
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense
from tensorflow.keras.optimizers import SGD
from keras.utils import to_categorical
import random
def load_mnist_byCid(cid):
data = []
labels = []
path = f"data/standalone"
img_paths = list(paths.list_images(path))
for imgpath in img_paths:
# Load the image in grayscale mode
img_grayscale = cv2.imread(imgpath, cv2.IMREAD_GRAYSCALE)
# Resize the image to 256x256; this can be adjusted based on user requirements
img_resized = cv2.resize(img_grayscale, (256, 256))
# Convert to 3D array and normalize
img = np.expand_dims(img_resized, axis=-1) / 255.0
# Extract the label from the image name according to the folder
label = imgpath.split(os.path.sep)[-2]
# Append the data to the list
data.append(img)
labels.append(label)
return np.array(data), np.array(labels)
def define_model(input_shape, num_classes):
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=input_shape))
model.add(MaxPool2D((2, 2)))
model.add(Flatten())
model.add(Dense(100, activation='relu', kernel_initializer='he_uniform'))
model.add(Dense(num_classes, activation='softmax'))
opt = SGD(learning_rate=0.01, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
return model
def setWeightSingleList(weights):
weights_flat = [w.flatten() for w in weights]
weights = np.concatenate(weights_flat).tolist()
return weights
def reshapeWeight(server_weight, client_weight):
reshape_weight = []
for layer_weights in client_weight:
n_weights = np.prod(layer_weights.shape)
reshape_weight.append(np.array(server_weight[:n_weights]).reshape(layer_weights.shape))
server_weight = server_weight[n_weights:]
return reshape_weight
def createRandomClientList(clients_dictionary, n_round_clients):
keys = list(clients_dictionary.keys())
return random.sample(keys, n_round_clients)
def train_model(cid, num_classes):
data, labels = load_mnist_byCid(cid)
labels = to_categorical(labels, num_classes=num_classes)
input_shape = (256, 256, 1) # Image size
model = define_model(input_shape, num_classes)
# Split data into training and testing sets
num_samples = len(data)
split = int(0.8 * num_samples)
x_train, x_test = data[:split], data[split:]
y_train, y_test = labels[:split], labels[split:]
# Train the model
model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=10, batch_size=32)
# Evaluate the model
loss, accuracy = model.evaluate(x_test, y_test)
print(f"Test Loss: {loss}, Test Accuracy: {accuracy}")
# Call the train_model function
if __name__ == "__main__":
client_id = 0 # Specify the client to train
num_classes = 10 # Number of classes in MNISTT
train_model(client_id, num_classes)
Run the script:
python run_fedlearn_local.py
5.2 Running FedLearn in Collaborative Mode
Collaborative mode is used for actual distributed federated learning across multiple devices or organizations.
Step 1: Server Setup
On the server machine, create a script run_server.py
:
import json
import cifer.fedlearn as fedlearn
import cifer.server as server
def load_config(config_path='config.json'):
"""
Load the configuration file.
"""
try:
with open(config_path, 'r') as config_file:
config = json.load(config_file)
return config
except FileNotFoundError:
print(f"Configuration file {config_path} not found.")
except json.JSONDecodeError:
print("Error decoding JSON from the configuration file.")
except Exception as e:
print(f"An error occurred: {e}")
def main():
config = load_config()
if config:
# Initialize FedLearn object
fl = fedlearn.FedLearn(config)
# Start the FedLearn server
fl.start_server()
if __name__ == "__main__":
main()
Run the server:
python run_server.py
Step 2: Client Setup
On each client machine, create a script run_client.py
:
import models as models
import sys
import grpc
import fed_grpc_pb2_grpc
import fed_grpc_pb2
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
import os
import signal
from concurrent import futures
from cifer import *
class FedClient(fed_grpc_pb2_grpc.FederatedServiceServicer):
def __init__(self, cid, x_train, x_test, y_train, y_test, model, server_adress, client_ip):
self.cid = cid
self.x_train = x_train
self.x_test = x_test
self.y_train = y_train
self.y_test = y_test
self.model = model
self.server_adress = server_adress
self.client_ip = client_ip
def __setClientChannel(self,client_channel):
self.client_channel = client_channel
def __waitingForServer(self,port):
client_channel = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
self.__setClientChannel(client_channel)
fed_grpc_pb2_grpc.add_FederatedServiceServicer_to_server(self, client_channel)
client_ip = self.client_ip + ':' + port
client_channel.add_insecure_port(client_ip)
client_channel.start()
client_channel.wait_for_termination()
def sendRound(self, request, context):
ac_round = request.round
print()
print(f"Starting {ac_round} round")
return fed_grpc_pb2.void()
def startLearning(self, request, context):
self.model.fit(self.x_train, self.y_train, epochs=1, verbose=2)
weights_list = aux.setWeightSingleList(self.model.get_weights())
return fed_grpc_pb2.weightList(weight = (weights_list))
def getSampleSize(self, request, context):
return fed_grpc_pb2.sampleSize(size = (len(self.x_train)))
def modelValidation(self, request, context):
server_weight = request.weight
self.model.set_weights(aux.reshapeWeight(server_weight, self.model.get_weights()))
accuracy = self.model.evaluate(self.x_test, self.y_test, verbose=0)[1]
print(f"Local accuracy with global weights: {accuracy}")
return fed_grpc_pb2.accuracy(acc = (accuracy))
def killClient(self, request, context):
print()
print(f"Call for closing channel - Killing Client {self.cid}")
self.client_channel.stop(0)
return fed_grpc_pb2.void()
def runClient(self):
server_channel = grpc.insecure_channel(self.server_adress)
client = fed_grpc_pb2_grpc.FederatedServiceStub(server_channel)
port = self.server_adress.split(':')[1]
port = str(30000 + int(self.cid))
register_out = client.clientRegister(fed_grpc_pb2.registerArgs(ip=self.client_ip, port=port, cid=self.cid))
if register_out.connectedClient:
print(f"Client Connected at round {register_out.round}, waiting for server commands...")
self.__waitingForServer(port)
else:
print("This client couldn't connect with the server")
if __name__ == '__main__':
cid = -1
input_shape = (28, 28, 1)
num_classes = 10
server_adress = 'localhost:8080'
client_ip = '[::]'
try:
cid = sys.argv[1]
except IndexError:
print("Missing argument! You need to pass: ClientId")
exit()
x_train, y_train = aux.load_mnist_byCid(cid)
x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.2, random_state=42)
y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)
model = aux.define_model(input_shape,num_classes)
fed_client = FedClient(cid, x_train, x_test, y_train, y_test, model, server_adress, client_ip)
fed_client.runClient()
Run the client on each participating machine:
python run_client.py
5.3 Example: Image Classification with MNIST
Here's a complete example using the MNIST dataset for image classification, adapted for lung cancer detection, and to be used when running FedLearn in Collaborative Mode (5.2):
from imutils import paths
import numpy as np
import os
import cv2
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D,Flatten,Dense
from tensorflow.keras.optimizers import SGD
from keras.utils import to_categorical
import random
from cifer import *
def load_mnist_byCid(cid):
data = []
labels = []
path = f"data/client_{cid}"
img_paths = list(paths.list_images(path))
for imgpath in img_paths:
img_grayscale = cv2.imread(imgpath, cv2.IMREAD_GRAYSCALE)
img_resized = cv2.resize(img_grayscale, (28, 28))
img = np.expand_dims(img_resized, axis=-1) / 255.0
label = imgpath.split(os.path.sep)[-2]
data.append(img)
labels.append(label)
return np.array(data), np.array(labels)
def define_model(input_shape,num_classes):
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=input_shape))
model.add(MaxPool2D((2, 2)))
model.add(Flatten())
model.add(Dense(100, activation='relu', kernel_initializer='he_uniform'))
model.add(Dense(num_classes, activation='softmax'))
opt = SGD(learning_rate=0.01, momentum=0.9)
model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])
return model
def setWeightSingleList(weights):
weights_flat = [w.flatten() for w in weights]
weights = np.concatenate(weights_flat).tolist()
return weights
def reshapeWeight(server_weight, client_weight):
reshape_weight = []
for layer_weights in client_weight:
n_weights = np.prod(layer_weights.shape)
reshape_weight.append(np.array(server_weight[:n_weights]).reshape(layer_weights.shape))
server_weight = server_weight[n_weights:]
return reshape_weight
def createRandomClientList(clients_dictionary, n_round_clients):
keys = list(clients_dictionary.keys())
return random.sample(keys, n_round_clients)
This example demonstrates how to use Cifer's FedLearn for a typical image classification task using the MNIST dataset. It includes data loading, model definition, federated learning execution, and final model evaluation and saving.
Last updated